In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from afinn import Afinn

In [3]:
spark = SparkSession.builder.getOrCreate()

In [44]:
# Loading JSON files into Spark DataFrames
businessDF = spark.read.json("yelp_academic_dataset_business.json")
reviewDF = spark.read.json("yelp_academic_dataset_review.json")

In [82]:
restaurantsDF = businessDF.filter(businessDF.categories.contains('Restaurants')).selectExpr('business_id', "stars as b_stars", 'is_open', 'name', 'city', 'state', 'review_count')

restaurants_reviewDF = reviewDF.join(restaurantsDF, 'business_id', 'inner').select('business_id', 'review_id', 'stars', 'useful', 'cool', 'funny')

In [56]:
# Register template so we can use SQL for queries
restaurantsDF.registerTempTable("restaurants")
restaurants_reviewDF.registerTempTable("reviews")

In [57]:
# Number of restaurants in the dataset
total_restaurants = restaurantsDF.select('business_id').distinct().count()
print("Total restaurants: ", total_restaurants)

Total restaurants:  63944


In [58]:
# Number of reviews in the dataset
total_reviews = restaurants_reviewDF.select('review_id').distinct().count()
print("Total reviews: ", total_reviews)

Total reviews:  5055992


In [60]:
# Show the top 10 cities with the most reviews in SQL
spark.sql(" SELECT \
                state, \
                city, \
                COUNT(*) AS total_reviews \
            FROM \
                restaurants \
            GROUP BY \
                state, \
                city \
            ORDER BY \
                COUNT(*) DESC ").show(10)

+-----+-----------+-------------+
|state|       city|total_reviews|
+-----+-----------+-------------+
|   ON|    Toronto|         8678|
|   NV|  Las Vegas|         6930|
|   AZ|    Phoenix|         4216|
|   QC|   Montréal|         4097|
|   AB|    Calgary|         3115|
|   NC|  Charlotte|         2920|
|   PA| Pittsburgh|         2608|
|   AZ| Scottsdale|         1652|
|   ON|Mississauga|         1612|
|   OH|  Cleveland|         1549|
+-----+-----------+-------------+
only showing top 10 rows



In [66]:
# Show restaurants with most 5-star reviews
reviews_5stars = restaurants_reviewDF.filter('stars >= 5')
reviews_grouped = reviews_5stars.groupby('business_id').count()

restaurants_joined = restaurantsDF.join(reviews_grouped, 'business_id', 'inner')
restaurants_sorted = restaurants_joined.sort('count', ascending=False)

restaurants_sorted.show(3, truncate=False)

+----------------------+-------+-----------------------------+---------+-----+------------+-----+
|business_id           |b_stars|name                         |city     |state|review_count|count|
+----------------------+-------+-----------------------------+---------+-----+------------+-----+
|4JNXUYY8wbaaDmk3BPzlWw|4.0    |Mon Ami Gabi                 |Las Vegas|NV   |9264        |4501 |
|RESDUcs7fIiihp38-d6_6g|4.0    |Bacchanal Buffet             |Las Vegas|NV   |10129       |4247 |
|faPVqws-x-5k2CQKDNtHxw|4.5    |Yardbird Southern Table & Bar|Las Vegas|NV   |4828        |3415 |
+----------------------+-------+-----------------------------+---------+-----+------------+-----+
only showing top 3 rows



In [67]:
# Storing top 3 businesses ID
monamiID = '4JNXUYY8wbaaDmk3BPzlWw'
bacchanalID = 'RESDUcs7fIiihp38-d6_6g'
yardbirdID = 'faPVqws-x-5k2CQKDNtHxw'

In [69]:
## Hypothesis 1: Good businesses have more positive sentiment from reviews
afinn = Afinn()
afinnUDF = udf(afinn.score, FloatType())

review_selected = reviewDF.select('review_id', 'business_id', 'text')
review_sentiment = review_selected.withColumn('sentiment', afinnUDF('text'))
review_sentiment.show(10)

+--------------------+--------------------+--------------------+---------+
|           review_id|         business_id|                text|sentiment|
+--------------------+--------------------+--------------------+---------+
|xQY8N_XvtGbearJ5X...|-MhfebM0QIsKt87iD...|As someone who ha...|     23.0|
|UmFMZ8PyXZTY2Qcwz...|lbrU8StCq3yDfr-QM...|I am actually hor...|    -11.0|
|LG2ZaYiOgpr2DK_90...|HQl28KMwrEKHqhFrr...|I love Deagan's. ...|     16.0|
|i6g_oA9Yf9Y31qt0w...|5JxlZaqCnk1MnbgRi...|Dismal, lukewarm,...|     -7.0|
|6TdNDKywdbjoTkize...|IS4cv902ykd8wj1TR...|Oh happy day, fin...|     24.0|
|L2O_INwlrRuoX05KS...|nlxHRv1zXGT0c0K51...|This is definitel...|      7.0|
|ZayJ1zWyWgY9S_TRL...|Pthe4qk5xh4n-ef-9...|Really good place...|     15.0|
|lpFIJYpsvDxyph-kP...|FNCJpSn0tL9iqoY3J...|Awesome office an...|     12.0|
|JA-xnyHytKiOIHl_z...|e_BiI4ej1CW1F0EyV...|Most delicious au...|      5.0|
|z4BCgTkfNtCu4XY5L...|Ws8V970-mQt2X9CwC...|I have been here ...|     15.0|
+--------------------+---

In [8]:
# Register template so we may use SQL
review_sentiment.registerTempTable("review_sentiment")

In [70]:
# A little cleanup to reduce the size before calculating total average (not sure if necessary)
sentiment_cleaned = review_sentiment.select('review_id', 'sentiment')
sentiment_cleaned.show()

+--------------------+---------+
|           review_id|sentiment|
+--------------------+---------+
|xQY8N_XvtGbearJ5X...|     23.0|
|UmFMZ8PyXZTY2Qcwz...|    -11.0|
|LG2ZaYiOgpr2DK_90...|     16.0|
|i6g_oA9Yf9Y31qt0w...|     -7.0|
|6TdNDKywdbjoTkize...|     24.0|
|L2O_INwlrRuoX05KS...|      7.0|
|ZayJ1zWyWgY9S_TRL...|     15.0|
|lpFIJYpsvDxyph-kP...|     12.0|
|JA-xnyHytKiOIHl_z...|      5.0|
|z4BCgTkfNtCu4XY5L...|     15.0|
|TfVth7UNfgilv4J3e...|      8.0|
|Tyx7AxYQfSRnBFUIX...|     32.0|
|wJMjt5C2y1RKgY0Xb...|     14.0|
|QCxPzh7cuxJrLd6A_...|     14.0|
|qWHp2l2lysENZObh6...|     -2.0|
|mjbs5CL4eMu4o6_Vt...|     -1.0|
|bVTjZgRNq8Toxzvti...|     -4.0|
|Ne_2CSfcKIqXHmv_K...|     18.0|
|Hy-gUXQh3RVhE8FLH...|     -1.0|
|UGErdm6bt48SXTVwJ...|     15.0|
+--------------------+---------+
only showing top 20 rows



In [80]:
# Calculate average sentiment of all restaurants
# VERY SLOW
sentiment_avg = sentiment_cleaned.agg({'sentiment': 'avg'})
sentiment_avg.show()

+------------------+
|    avg(sentiment)|
+------------------+
|10.036476817083695|
+------------------+



In [72]:
# 1.1 Bacchanal Buffet
bacchanal_reviews_sentiment = review_sentiment.filter(review_sentiment['business_id'] == bacchanalID)
bacchanal_sentiment_avg = bacchanal_reviews_sentiment.agg({'sentiment': 'avg'})
bacchanal_sentiment_avg.show()

+------------------+
|    avg(sentiment)|
+------------------+
|12.718441009887684|
+------------------+



In [16]:
# 1.2 Mon Ami Gabi
monami_reviews_sentiment = review_sentiment.filter(review_sentiment['business_id'] == monamiID)
monami_sentiment_avg = monami_reviews_sentiment.agg({'sentiment': 'avg'})
monami_sentiment_avg.show()

+------------------+
|    avg(sentiment)|
+------------------+
|15.043414429530202|
+------------------+



In [73]:
# 1.3 Yardbird Southern Table
yardbird_reviews_sentiment = review_sentiment.filter(review_sentiment['business_id'] == yardbirdID)
yardbird_sentiment_avg = yardbird_reviews_sentiment.agg({'sentiment': 'avg'})
yardbird_sentiment_avg.show()

+------------------+
|    avg(sentiment)|
+------------------+
|16.034745932918256|
+------------------+



In [75]:
# Hypothesis 2: Good businesses have more Useful, Funny, and Cool reviews.
votes_avg = reviewDF.select('business_id', 'review_id', 'useful', 'funny', 'cool')
votes_agg = votes_avg.agg({'useful': 'avg', 'funny': 'avg', 'cool': 'avg'})
votes_agg.show()

+------------------+------------------+------------------+
|         avg(cool)|       avg(useful)|        avg(funny)|
+------------------+------------------+------------------+
|0.5745620126461111|1.3228823847835751|0.4596423044057926|
+------------------+------------------+------------------+



In [76]:
# 2.1 Bacchanal Buffet
bucchanal_filtered = reviewDF.filter(reviewDF['business_id'] == bacchanalID)
bucchanal_selected = bucchanal_filtered.select('business_id', 'review_id', 'useful', 'funny', 'cool')
bucchanal_agg = bucchanal_selected.agg({'useful': 'avg', 'funny': 'avg', 'cool': 'avg'})
bucchanal_agg.show()

+-----------------+-----------------+------------------+
|        avg(cool)|      avg(useful)|        avg(funny)|
+-----------------+-----------------+------------------+
|0.759719688969953|1.419602572717673|0.6408754919842565|
+-----------------+-----------------+------------------+



In [77]:
# 2.2 Mon Ami Gabi
monami_filtered = reviewDF.filter(reviewDF['business_id'] == monamiID)
monami_selected = monami_filtered.select('business_id', 'review_id', 'useful', 'funny', 'cool')
monami_agg = monami_selected.agg({'useful': 'avg', 'funny': 'avg', 'cool': 'avg'})
monami_agg.show()

+------------------+------------------+------------------+
|         avg(cool)|       avg(useful)|        avg(funny)|
+------------------+------------------+------------------+
|0.5479236577181208|0.7775796979865772|0.3859060402684564|
+------------------+------------------+------------------+



In [78]:
# 2.3 Yardbird Southern Table
yardbird_filtered = reviewDF.filter(reviewDF['business_id'] == yardbirdID)
yardbird_selected = yardbird_filtered.select('business_id', 'review_id', 'useful', 'funny', 'cool')
yardbird_agg = yardbird_selected.agg({'useful': 'avg', 'funny': 'avg', 'cool': 'avg'})
yardbird_agg.show()

+------------------+------------------+------------------+
|         avg(cool)|       avg(useful)|        avg(funny)|
+------------------+------------------+------------------+
|0.6519381401887929|0.8567985539264913|0.4081140791323559|
+------------------+------------------+------------------+



In [130]:
GeoPointsChart()

NameError: name 'GeoPointsChart' is not defined