# Subsetting and Exploring Yelp Data

In [35]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import explode, split, col, concat_ws, count
import os
import re

In [2]:
ss = SparkSession.builder \
    .appName("Yelp Data Analysis") \
    .config("spark.driver.bindAddress", "localhost") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 14:09:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Get DF for yelp_academic_dataset_business.json
df = ss.read.json("/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_academic_dataset_business.json")

In [283]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [290]:
ss.sql("DROP TABLE IF EXISTS yelp_business")

# df.write.option('path', '/yelp_dataset/yelp_business_df').saveAsTable('yelp_business')
df.write.saveAsTable('yelp_business')

SparkRuntimeException: [LOCATION_ALREADY_EXISTS] Cannot name the managed table as `spark_catalog`.`default`.`yelp_business`, as its associated location 'file:/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/spark-warehouse/yelp_business' already exists. Please pick a different table name, or remove the existing location first.

In [291]:
ss.sql("SELECT city, COUNT(*) FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df` GROUP BY city ORDER BY count(*) DESC ").show(5)

+------------+--------+
|        city|count(1)|
+------------+--------+
|Philadelphia|   14569|
|      Tucson|    9250|
|       Tampa|    9050|
|Indianapolis|    7540|
|   Nashville|    6971|
+------------+--------+
only showing top 5 rows



In [292]:
query = """ 
        SELECT categories, count(*)
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia'
        GROUP BY categories
        """

ss.sql(query).show()



+--------------------+--------+
|          categories|count(1)|
+--------------------+--------+
|Home Services, Ho...|       1|
|Chicken Wings, Fa...|       2|
|Auto Customizatio...|       1|
|Health & Medical,...|       1|
|Tours, Active Lif...|       1|
|Food, Grocery, Sh...|       1|
|Active Life, Braz...|       1|
|Fruits & Veggies,...|       1|
|Bagels, Restauran...|       1|
|  Parks, Active Life|      17|
|Beauty & Spas, He...|       1|
|Japanese, Food, T...|       1|
|Laboratory Testin...|       3|
|Laser Hair Remova...|       1|
|Korean, Food, Res...|       1|
|Food Stands, Rest...|       2|
|Hair Removal, Wax...|       1|
|Food, Specialty F...|       1|
|Pizza, Italian, S...|       3|
|Nightlife, Restau...|       1|
+--------------------+--------+
only showing top 20 rows



                                                                                

In [293]:
query = """ 
        SELECT categories
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia' AND categories LIKE '%Restaurant%'
        """

ss.sql(query).show(10, truncate=False)

+--------------------------------------------------------------------------------------------------+
|categories                                                                                        |
+--------------------------------------------------------------------------------------------------+
|Burgers, Caterers, Restaurants, Nightlife, American (Traditional), Bars, Event Planning & Services|
|Restaurants, American (New)                                                                       |
|Breakfast & Brunch, Restaurants                                                                   |
|Restaurants, Chicken Shop, Peruvian                                                               |
|Fast Food, Restaurants, Food, Coffee & Tea, Burgers                                               |
|Greek, Food, Mediterranean, Falafel, Restaurants, Salad, Caterers, Event Planning & Services      |
|Restaurants, American (New)                                                               

In [242]:
# Count of restauarants and bars in Philly
query = """ 
        SELECT count(*) AS total_restaurants_bars
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia' 
         AND (categories LIKE '%Restaurant%' OR categories LIKE '%Bar%')
        """

ss.sql(query).show()

+----------------------+
|total_restaurants_bars|
+----------------------+
|                  6409|
+----------------------+



In [243]:
new_df = ss.sql("""
        SELECT categories
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia' AND (categories LIKE '%Restaurant%' OR categories LIKE '%Bar%')
        """)

# Splitting categories and exploding them into separate rows, then selecting distinct
unique_categories = new_df.withColumn("category", explode(split("categories", ", "))) \
                      .select("category").distinct()

unique_categories.show(10, truncate=False)


+-------------------+
|category           |
+-------------------+
|Skating Rinks      |
|Fondue             |
|Hobby Shops        |
|Bubble Tea         |
|Restaurant Supplies|
|Cafeteria          |
|Salad              |
|Beauty & Spas      |
|Beer Bar           |
|Education          |
+-------------------+
only showing top 10 rows



In [244]:
ambience_df = df.select("attributes.Ambience").filter(col("attributes.Ambience").isNotNull())
ambience_df.show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Ambience                                                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|None                                                                                                                                                            |
|{'romantic': False, 'intimate': False, 'touristy': False, 'hipster': False, 'divey': False, 'classy': False, 'trendy': False, 'upscale': False, 'casual': False}|
|{'touristy': False, 'hipster': False, 'romantic': False, 'divey': False, 'intimate': False, 'trendy': False, 'upscale': False, 'classy': False, 'casual': False}|
|{'touristy': None, 'h

In [245]:
# Subset of business_ids for restauarants and bars in Philly
query = """ 
        SELECT business_id
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia' 
         AND (categories LIKE '%Restaurant%' OR categories LIKE '%Bar%')
        """

subset_df = ss.sql(query)

In [246]:
subset_df.select('business_id').distinct().count()

6409

In [247]:
# subset_df.coalesce(1).write.csv/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/subset_philly_businesses')

In [248]:
# Subset of yelp data for restauarants and bars in Philly
query = """ 
        SELECT *
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE city = 'Philadelphia' 
         AND (categories LIKE '%Restaurant%' OR categories LIKE '%Bar%')
        """

subset_df_all = ss.sql(query)

In [249]:
ss.sql("DROP TABLE IF EXISTS philly_df")

subset_df_all.write.saveAsTable('philly_df')

24/03/28 13:01:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Photos metadata JSON

In [250]:
photos_meta = ss.read.json('/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_photos/photos.json')

In [251]:
photos_meta.select('*').show(5)

+--------------------+--------------------+-------+--------------------+
|         business_id|             caption|  label|            photo_id|
+--------------------+--------------------+-------+--------------------+
|Nk-SJhPlDBkAZvfsA...|Nice rock artwork...| inside|zsvj7vloL4L5jhYyP...|
|yVZtL5MmrpiivyCIr...|                    |outside|HCUdRJHHm_e0OCTlZ...|
|_ab50qdWOk0DdB6XO...|      oyster shooter|  drink|vkr8T0scuJmGVvN2H...|
|SZU9c8V2GuREDN5Kg...|       Shrimp scampi|   food|pve7D6NUrafHW3EAO...|
|Gzur0f0XMkrVxIwYJ...|                    |   food|H52Er-uBg6rNrHcRe...|
+--------------------+--------------------+-------+--------------------+
only showing top 5 rows



In [252]:
ss.sql("DROP TABLE IF EXISTS photos_meta")
photos_meta.write.saveAsTable('photos_meta')

In [253]:
philly_photos = photos_meta.join(subset_df, on='business_id')

In [254]:
philly_photos.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- label: string (nullable = true)
 |-- photo_id: string (nullable = true)



In [255]:
ss.sql("DROP TABLE IF EXISTS philly_photos")

philly_photos.write.saveAsTable('philly_photos')

In [256]:
# Number businesses with at least 5 photos
ss.sql("SELECT business_id, count(photo_id) FROM philly_photos GROUP BY business_id HAVING count(photo_id) >= 5").count()

1307

In [257]:
ss.sql("SELECT business_id, count(photo_id) FROM philly_photos GROUP BY business_id HAVING count(photo_id) >= 5").show(5)

+--------------------+---------------+
|         business_id|count(photo_id)|
+--------------------+---------------+
|mhm7pNRVZhSr3RBo9...|             37|
|VwZ5NDbIu3elGQI6M...|             11|
|-fjIX9bvBKwXjQWfd...|              9|
|r3X75SK37buS94mDl...|             12|
|3FKIev7ZB_KE6XHL9...|             15|
+--------------------+---------------+
only showing top 5 rows



In [258]:
query = """
        SELECT philly_photos.business_id, count(philly_photos.photo_id) 
        FROM philly_photos
        JOIN yelp_business ON philly_photos.business_id = yelp_business.business_id
        GROUP BY philly_photos.business_id 
         HAVING count(philly_photos.photo_id) > 5
        
        """

ss.sql(query).show(5)

+--------------------+---------------+
|         business_id|count(photo_id)|
+--------------------+---------------+
|mhm7pNRVZhSr3RBo9...|             37|
|VwZ5NDbIu3elGQI6M...|             11|
|-fjIX9bvBKwXjQWfd...|              9|
|r3X75SK37buS94mDl...|             12|
|3FKIev7ZB_KE6XHL9...|             15|
+--------------------+---------------+
only showing top 5 rows



In [259]:
# philly_photos.select('business_id', 'photo_id').distinct().coalesce(1).write.csv('/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/subset_philly_businesses_with_photos')

Examples from images notebook

In [260]:
query = """ 
        SELECT name
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE business_id = 'QtST1igZAi0q9LxJr6srIQ'
        """

ss.sql(query).show()

+------------------+
|              name|
+------------------+
|Pearl's Oyster Bar|
+------------------+



In [261]:
df.where("business_id = 'QtST1igZAi0q9LxJr6srIQ'").select('name','categories').show(truncate=False)

+------------------+-----------------------------------------------------------------------+
|name              |categories                                                             |
+------------------+-----------------------------------------------------------------------+
|Pearl's Oyster Bar|Restaurants, Seafood, American (New), Live/Raw Food, Breakfast & Brunch|
+------------------+-----------------------------------------------------------------------+



In [262]:
query = """ 
        SELECT name
        FROM parquet.`/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_business_df`
        WHERE business_id = '_qgqa4X1OiMLqub1hHaMjg'
        """

ss.sql(query).show()

+-------------+
|         name|
+-------------+
|Res Ipsa Cafe|
+-------------+



In [263]:
df.where("business_id = '_qgqa4X1OiMLqub1hHaMjg'").select('name','categories', 'attributes.Ambience').show()

+-------------+--------------------+--------------------+
|         name|          categories|            Ambience|
+-------------+--------------------+--------------------+
|Res Ipsa Cafe|Restaurants, Coff...|{'touristy': Fals...|
+-------------+--------------------+--------------------+



## Merged Philly business data and photo metadata

In [264]:
photos_meta.show(5)

+--------------------+--------------------+-------+--------------------+
|         business_id|             caption|  label|            photo_id|
+--------------------+--------------------+-------+--------------------+
|Nk-SJhPlDBkAZvfsA...|Nice rock artwork...| inside|zsvj7vloL4L5jhYyP...|
|yVZtL5MmrpiivyCIr...|                    |outside|HCUdRJHHm_e0OCTlZ...|
|_ab50qdWOk0DdB6XO...|      oyster shooter|  drink|vkr8T0scuJmGVvN2H...|
|SZU9c8V2GuREDN5Kg...|       Shrimp scampi|   food|pve7D6NUrafHW3EAO...|
|Gzur0f0XMkrVxIwYJ...|                    |   food|H52Er-uBg6rNrHcRe...|
+--------------------+--------------------+-------+--------------------+
only showing top 5 rows



In [265]:
ss.sql('DROP TABLE IF EXISTS business_ids_with_5_plus')

business_ids_with_5_plus =ss.sql('SELECT photos_meta.business_id AS business_id  FROM photos_meta GROUP BY photos_meta.business_id  HAVING count(photos_meta.photo_id) >= 5')

business_ids_with_5_plus.write.saveAsTable('business_ids_with_5_plus')

In [266]:
query = """
        SELECT photos_meta.business_id AS philly_business_id, 
               photos_meta.photo_id, photos_meta.caption,  photos_meta.label,
               philly_df.*
        FROM philly_df
        JOIN photos_meta        
         ON philly_df.business_id = photos_meta.business_id
        WHERE philly_df.business_id IN (SELECT business_id FROM business_ids_with_5_plus)
        """

merged = ss.sql(query)

In [267]:
ss.sql('DROP TABLE IF EXISTS merged')

merged.write.saveAsTable('merged')

In [268]:
query = """ 
        SELECT business_id, collect_list(photo_id) AS photo_ids
        FROM merged
        GROUP BY business_id
        
        """

photo_ids_by_business = ss.sql(query)

In [269]:
photo_ids_by_business.show(10)

+--------------------+--------------------+
|         business_id|           photo_ids|
+--------------------+--------------------+
|-0TffRSXXIlBYVbb5...|[eznc-C7O6FY9WDh_...|
|-1B9pP_CrRBJYPICE...|[bg5-ER5xlOw6h6ZF...|
|-1b2kNOowsPrPpBOK...|[C6wAPyFFL_7K9CX4...|
|-2-ih3mE8KPyeKVIz...|[7j89K2-fBjtvmCrJ...|
|-5Rah4ZvWsDu4oilU...|[I8CjUQWNIavQY7qV...|
|-ATiAtTikuGuqvaW2...|[MlbUtrJPlJF7NFJU...|
|-DGsnMlRrR_tv8avr...|[Mb0f-rLNLqC2pHVd...|
|-FMI0_EhMjwXaAgHV...|[3enlAfQNr6s2OlhJ...|
|-NBNpfXO_WlUJFprq...|[_WwUjpREHVVSASrz...|
|-OIUunijjcq_ZzyyQ...|[pe1aHSdpwHwJ3yBs...|
+--------------------+--------------------+
only showing top 10 rows



In [271]:
# Convert the array to string using a delimiter (e.g., comma)
photo_ids_by_business = photo_ids_by_business.withColumn("photo_ids", concat_ws(",", col("photo_ids")))

# Write the DataFrame to a CSV file
photo_ids_by_business.coalesce(1).write.csv('/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/photo_ids_by_business.csv', header=True)


## Make new directory for files

In [274]:
import os
import shutil

photo_base_dir = '/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_photos/photos'

new_base_dir = '/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/philly_business_photos'

for row in photo_ids_by_business.collect():
    business_id = row['business_id']
    photo_ids = row['photo_ids'].split(',')  # Assuming photo_ids are comma-separated
    photo_files = [f'{id}.jpg' for id in photo_ids]
    
    # Create a new subdirectory for the business
    business_dir = os.path.join(new_base_dir, business_id)
    os.makedirs(business_dir, exist_ok=True)
    
    # Copy each photo to the new subdirectory
    for photo_file in photo_files:
        # Construct file paths
        source_path = os.path.join(photo_base_dir, photo_file)
        destination_path = os.path.join(business_dir, photo_file)
        
        # Check if the source file exists and then copy
        if os.path.exists(source_path):
            shutil.copy(source_path, destination_path)


In [275]:
ss.stop()

## Get reviews for subset of Philly Restaurants

In [None]:
reviews_df = ss.read.json('/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_academic_dataset_review.json')

                                                                                

In [None]:
photo_ids_by_business = ss.read.csv('/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/photo_ids_by_business.csv', inferSchema=True, header=True)

In [30]:
photo_ids_by_business.count()

1307

In [27]:
photo_ids_by_business = photo_ids_by_business.withColumn("business_id", photo_ids_by_business["business_id"].cast("string"))
reviews_df = reviews_df.withColumn("business_id", reviews_df["business_id"].cast("string"))

# Perform an inner join to filter reviews for businesses with 5+ photos
reviews_with_photos = reviews_df.join(photo_ids_by_business, "business_id", "inner")

# Show the filtered DataFrame
reviews_with_photos.show(10)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|           photo_ids|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+
|04UD14gamNjLY0IDY...|   1|2015-09-23 23:10:31|    2|JrIxlS1TzJ-iCu79u...|  1.0|I am a long term ...|     1|eUta8W_HdHMXPzLBB...|8LeA8ICQnDr7ndhi2...|
|RZtGWDLCAtuipwaZ-...|   0|2009-10-14 19:57:14|    0|8JFGBuHMoiNDyfcxu...|  4.0|Good food--loved ...|     0|smOvOajNG0lS4Pq7d...|bDiREYJUr-kN1Orbc...|
|YtSqYv1Q_pOltsVPS...|   0|2013-06-24 11:21:25|    0|oyaMhzBSwfGgemSGu...|  5.0|Tremendous servic...|     0|Dd1jQj7S-BFGqRbAp...|nk6KCQI4gnDawWoU5...|
|oBhJuukGRqPVvYBfT...|   0|2015-03-05 03:37:54|    0|YcLXh-3UC9y6YFAI9...|  4.0|The only reaso

In [31]:
# Number of reviews for Philly Restaurants and Bars
reviews_with_photos.count()

                                                                                

427057

In [44]:
reviews_with_photos.groupBy('business_id').agg(count('review_id').alias('count_reviews')).sort('count_reviews',ascending=False).show(truncate=False)



+----------------------+-------------+
|business_id           |count_reviews|
+----------------------+-------------+
|ytynqOUb3hjKeJfRj5Tshw|5778         |
|PP3BBaVxZLcJU54uP_wL6Q|4293         |
|IkY2ticzHEn4QFn8hQLSWg|3428         |
|9PZxjhTIU7OgPIzuGi89Ew|3264         |
|ctHjyadbDQAtUFfkcAFEHw|3173         |
|6ajnOk0GcY9xbb5Ocaw8Gw|2974         |
|j-qtdD55OLfSqfsWuQTDJg|2884         |
|AGlh4ZDv6jnoiYfz7At9mw|2778         |
|sTPueJEwcRDj7ZJmG7okYA|2769         |
|0RuvlgTnKFbX3IK0ZOOocA|2733         |
|RQAF6a0akMiot5lZZnMNNw|2725         |
|8pqdJjefYq-a9IBSJJmKwA|2577         |
|ntiIq1FNqduOyyowMFGh5A|2255         |
|IWHdx0NhDKADkGOgXgOFKQ|2245         |
|OdIBX09glfXNVSyd0RnIeg|2241         |
|S8ZFYEgMejpChID8tzKo9A|2162         |
|kZ1q0K13tFYG_ZJrVvsJHA|2076         |
|i_FWONQD1ZBqrNE2b-M5Ug|1964         |
|q-zV08jt6U-q05SMEuQJAQ|1952         |
|6_T2xzR74JqGCTPefAD8Tw|1943         |
+----------------------+-------------+
only showing top 20 rows



                                                                                

In [49]:
df = ss.read.json("/Users/scampione/MSDS/Spring_24_2/Entrepreneurship/yelp_dataset/yelp_academic_dataset_business.json")

In [80]:
df.select('name', 'categories').where("business_id = 'ctHjyadbDQAtUFfkcAFEHw'").show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------+
|name |categories                                                                                          |
+-----+----------------------------------------------------------------------------------------------------+
|Zahav|Nightlife, Bars, Food, Ethnic Food, Middle Eastern, Vegetarian, Specialty Food, Israeli, Restaurants|
+-----+----------------------------------------------------------------------------------------------------+



In [81]:
five_reivews_for_PP3BBaVxZLcJU54uP_wL6Q = reviews_with_photos.select('text').where("business_id = 'ctHjyadbDQAtUFfkcAFEHw'")

In [82]:
texts = five_reivews_for_PP3BBaVxZLcJU54uP_wL6Q.collect()

                                                                                

In [73]:
def clean_text(text):
    text = text.replace("\\'", "'")
    text = text.replace("\n", " ")
    text = re.sub(r"[^a-zA-Z0-9.,'?!;: ]", "", text)
    text = re.sub(r"\s+", " ", text)
    text = text.lower()    
    return text


In [83]:
cleaned_text = clean_text(texts[0][0])
print(cleaned_text)


date night with girlfriend here was perfect. i wouldn't say im so much a foodie but i love to eat good food. 9:45 reservation, walked right to our table. exceptional service from start to finish. jeff was our server he was great. we both did the 39 plan and we started with humus tahani and the most amazing phenomenal soft warm pita bread. next we each chose 2 small plates fried broccoli, cucumber and yogurt, short rips and potatoes which melted in my mouth, and chicken pastilla! they were perfect and filling. they also served us side mediterterannean salad items carrots, zucchini, etc lamb and beef kofte followed these. incredible and the most amazing flourless frozen almond chocolate cake and some kind of amazing ice cream.... well basically, im a 25 year old student and it was a pleasure spending 122 on a fantastic meal in an incredible atmosphere with a pretty ambiance with above average service. would most definitely recommend


In [84]:
ss.stop()

Get top 20 sorted by num top_stars/review_counts