# Data Wrangling

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, split
from pyspark.sql.functions import array_contains

import pandas as pd
import json
import nltk
from nltk.tokenize import word_tokenize

import collections
from collections import Counter

import re

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
def flatten(l):
    for el in l:
        if isinstance(el, collections.Iterable) and not isinstance(el, (str, bytes)):
            yield from flatten(el)
        else:
            yield el

def flatten_cats(df, col):
    cat_list = df.select(col).rdd.flatMap(lambda x: x).collect()
    catList_flat = list(flatten(cat_list))
    return catList_flat

## Loading Data into Spark DataFrames

In [4]:
bizDF = spark.read.json('../DownloadedFiles/business.json')

In [5]:
reviewDF = spark.read.json('../DownloadedFiles/review.json')

In [6]:
#print('bizDF shape:', bizDF.count(), len(bizDF.columns))
#print('reviewDF shape:', reviewDF.count(), len(reviewDF.columns))

#RESULT
#bizDF shape: 192609 14
#reviewDF shape: 6685900 9

In [7]:
bizDF.groupBy().sum('review_count').show()

+-----------------+
|sum(review_count)|
+-----------------+
|          6459906|
+-----------------+



Why is the number of reviews higher than the sum of the review counts for the individual businesses?

In [8]:
revs_by_biz = reviewDF.groupBy('business_id').count()

In [9]:
#number of businesses in review dataframe -- should be 192609
revs_by_biz.count()

192606

3 more businesses in business file than in he review file?

In [10]:
merged = bizDF.join(revs_by_biz, on = 'business_id',how = 'outer')

In [11]:
merged.count()

192609

In [12]:
merged.columns

['business_id',
 'address',
 'attributes',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state',
 'count']

In [13]:
not_equal = merged.withColumn('diff', merged.review_count - merged['count'])
not_equal = not_equal[['business_id','name','review_count','count','diff','state']]
not_equal.sort('diff').show()

+--------------------+--------------------+------------+-----+----+-----+
|         business_id|                name|review_count|count|diff|state|
+--------------------+--------------------+------------+-----+----+-----+
|Pah1N0Di1WA3wsI5T...|   Gem and Bead Mall|           7| null|null|   NV|
|YSilq0Was9b4Q7oId...|Zelma Basha Samer...|           4| null|null|   AZ|
|W6q7CANl3UbQa5hGC...|           Starbucks|          17| null|null|   AZ|
|RESDUcs7fIiihp38-...|    Bacchanal Buffet|        8339| 8568|-229|   NV|
|4JNXUYY8wbaaDmk3B...|        Mon Ami Gabi|        8348| 8570|-222|   NV|
|5LNZ67Yw9RD6nf4_U...|The Cosmopolitan ...|        4322| 4522|-200|   NV|
|K7lWdNUhCbcnEvI0N...|        Wicked Spoon|        6708| 6887|-179|   NV|
|JDZ6_yycNQFTpUZzL...|   El Dorado Cantina|        2623| 2801|-178|   NV|
|7sPNbCx7vGAaH7SbN...|        Bachi Burger|        3333| 3494|-161|   NV|
|MpmFFw0GE_2iRFPds...|        XS Nightclub|        3055| 3212|-157|   NV|
|rcaPajgKOJC2vo_l3...|             Bou

In [14]:
not_equal.count()

192609

In [15]:
not_equal.where(not_equal.diff > 0).show()

+--------------------+--------------------+------------+-----+----+-----+
|         business_id|                name|review_count|count|diff|state|
+--------------------+--------------------+------------+-----+----+-----+
|tthMvRgSuLm5RNCbL...|  Gino's Auto Repair|          24|   23|   1|   NC|
|rj5YbrxqvfOVgcrgz...|           McDonalds|           5|    4|   1|   NV|
|b3TMNEZJBnzP3naW-...|Advanced Orthoped...|          37|   36|   1|   NV|
|LBQRwD514qV6VMmCG...|Omizu Japanese Re...|          14|   13|   1|   OH|
|yazoblxDJm2Xg8ub4...|          Jiffy Lube|          17|   16|   1|   AZ|
|VnpZg4h3O5AlcMjJY...|         M&M Customs|           3|    2|   1|   NC|
|1xB0Qf2-TN6CQ5c4F...|Taiyaki NYC - Tor...|          56|   55|   1|   ON|
+--------------------+--------------------+------------+-----+----+-----+



In [16]:
#number of businesses by state/province in dataset
byState = bizDF.groupBy('state').count()
byState.sort('count').show()

#number of states/provinces in dataset
byState.count()

+-----+-----+
|state|count|
+-----+-----+
|  DUR|    1|
|   NJ|    1|
|  DOW|    1|
|   BC|    1|
|   NM|    1|
|  BAS|    1|
|  XGL|    1|
|   AR|    1|
|   TN|    1|
|  CON|    1|
|   UT|    1|
|  XWY|    2|
|   VT|    2|
|   NE|    2|
|   GA|    2|
|   VA|    2|
|   AK|    2|
|   WA|    3|
|   CT|    3|
|   AL|    3|
+-----+-----+
only showing top 20 rows



36

## Filtering to Get just Restaurants

In [17]:
bizDF[['categories']].show(10)

+--------------------+
|          categories|
+--------------------+
|   Golf, Active Life|
|Specialty Food, R...|
|Sushi Bars, Resta...|
|Insurance, Financ...|
|Plumbing, Shoppin...|
|Shipping Centers,...|
|Beauty & Spas, Ha...|
|Hair Salons, Hair...|
|Nail Salons, Beau...|
|Beauty & Spas, Na...|
+--------------------+
only showing top 10 rows



In [18]:
bizDF.dtypes

[('address', 'string'),
 ('attributes',
  'struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>'),
 ('business_id', 'string'),
 ('categories', 'string'),
 ('city', 'string'),
 ('hours',
  'struct<Friday:st

In [19]:
bizDF = bizDF.withColumn('categoriesList', split(col('categories'),',\s*'))

In [20]:
catList = bizDF.select('categoriesList').rdd.flatMap(lambda x: x).collect()

In [21]:
catList

[['Golf', 'Active Life'],
 ['Specialty Food',
  'Restaurants',
  'Dim Sum',
  'Imported Food',
  'Food',
  'Chinese',
  'Ethnic Food',
  'Seafood'],
 ['Sushi Bars', 'Restaurants', 'Japanese'],
 ['Insurance', 'Financial Services'],
 ['Plumbing',
  'Shopping',
  'Local Services',
  'Home Services',
  'Kitchen & Bath',
  'Home & Garden',
  'Water Heater Installation/Repair'],
 ['Shipping Centers',
  'Couriers & Delivery Services',
  'Local Services',
  'Printing Services'],
 ['Beauty & Spas', 'Hair Salons'],
 ['Hair Salons',
  'Hair Stylists',
  'Barbers',
  "Men's Hair Salons",
  'Cosmetics & Beauty Supply',
  'Shopping',
  'Beauty & Spas'],
 ['Nail Salons', 'Beauty & Spas', 'Day Spas'],
 ['Beauty & Spas', 'Nail Salons', 'Day Spas', 'Massage'],
 ['Local Services',
  'Professional Services',
  'Computers',
  'Shopping',
  'Home Services',
  'IT Services & Computer Repair',
  'Internet Service Providers',
  'Web Design'],
 ['Restaurants',
  'Breakfast & Brunch',
  'Mexican',
  'Tacos',
  '

In [22]:
catList_flat = list(flatten(catList))

In [23]:
catList_flat

['Golf',
 'Active Life',
 'Specialty Food',
 'Restaurants',
 'Dim Sum',
 'Imported Food',
 'Food',
 'Chinese',
 'Ethnic Food',
 'Seafood',
 'Sushi Bars',
 'Restaurants',
 'Japanese',
 'Insurance',
 'Financial Services',
 'Plumbing',
 'Shopping',
 'Local Services',
 'Home Services',
 'Kitchen & Bath',
 'Home & Garden',
 'Water Heater Installation/Repair',
 'Shipping Centers',
 'Couriers & Delivery Services',
 'Local Services',
 'Printing Services',
 'Beauty & Spas',
 'Hair Salons',
 'Hair Salons',
 'Hair Stylists',
 'Barbers',
 "Men's Hair Salons",
 'Cosmetics & Beauty Supply',
 'Shopping',
 'Beauty & Spas',
 'Nail Salons',
 'Beauty & Spas',
 'Day Spas',
 'Beauty & Spas',
 'Nail Salons',
 'Day Spas',
 'Massage',
 'Local Services',
 'Professional Services',
 'Computers',
 'Shopping',
 'Home Services',
 'IT Services & Computer Repair',
 'Internet Service Providers',
 'Web Design',
 'Restaurants',
 'Breakfast & Brunch',
 'Mexican',
 'Tacos',
 'Tex-Mex',
 'Fast Food',
 'Bars',
 'Nightlife',

In [24]:
catList_unique = set(catList_flat)
len(catList_unique)

1301

In [25]:
Counter(catList_flat).most_common()

[('Restaurants', 59371),
 ('Shopping', 31878),
 ('Food', 29989),
 ('Home Services', 19729),
 ('Beauty & Spas', 19370),
 ('Health & Medical', 17171),
 ('Local Services', 13932),
 ('Automotive', 13203),
 ('Nightlife', 13095),
 ('Bars', 11341),
 ('Event Planning & Services', 10371),
 ('Active Life', 9521),
 ('Fashion', 7798),
 ('Sandwiches', 7332),
 ('Coffee & Tea', 7321),
 ('Fast Food', 7257),
 ('American (Traditional)', 7107),
 ('Hair Salons', 6955),
 ('Pizza', 6804),
 ('Home & Garden', 6489),
 ('Arts & Entertainment', 6304),
 ('Professional Services', 6276),
 ('Auto Repair', 6140),
 ('Hotels & Travel', 6033),
 ('Doctors', 5867),
 ('Real Estate', 5677),
 ('Burgers', 5404),
 ('Breakfast & Brunch', 5381),
 ('Nail Salons', 5043),
 ('Specialty Food', 4883),
 ('American (New)', 4882),
 ('Italian', 4716),
 ('Fitness & Instruction', 4646),
 ('Mexican', 4618),
 ('Chinese', 4428),
 ('Pets', 4111),
 ('Hair Removal', 4002),
 ('Bakeries', 3711),
 ('Grocery', 3609),
 ('Dentists', 3540),
 ('Skin Care

In [26]:
restBizDF = bizDF.filter(array_contains(col('categoriesList'), 'Restaurants'))

In [27]:
restBizDF.select('name', 'categoriesList').show(10)

+--------------------+--------------------+
|                name|      categoriesList|
+--------------------+--------------------+
|Emerald Chinese R...|[Specialty Food, ...|
|Musashi Japanese ...|[Sushi Bars, Rest...|
|           Taco Bell|[Restaurants, Bre...|
|       Marco's Pizza|[Italian, Restaur...|
|Carluccio's Tivol...|[Restaurants, Ita...|
|      Marathon Diner|[Sandwiches, Sala...|
|Maria's Mexican R...|[Mexican, Restaur...|
|      Bolt Fresh Bar|[Juice Bars & Smo...|
|The Steady Cafe &...|[Restaurants, Nig...|
|   Manzetti's Tavern|[Sandwiches, Ital...|
+--------------------+--------------------+
only showing top 10 rows



In [28]:
restCatList = restBizDF.select('categoriesList').rdd.flatMap(lambda x: x).collect()
restCatList_flat = list(flatten(restCatList))

In [29]:
restCatList_unique = set(restCatList_flat)
len(restCatList_unique)

761

In [30]:
restCatList_unique

{'& Probates',
 'Acai Bowls',
 'Accessories',
 'Accountants',
 'Acne Treatment',
 'Active Life',
 'Acupuncture',
 'Adult Education',
 'Adult Entertainment',
 'Advertising',
 'Afghan',
 'African',
 'Air Duct Cleaning',
 'Aircraft Repairs',
 'Airport Lounges',
 'Airport Shuttles',
 'Airport Terminals',
 'Airports',
 'Airsoft',
 'Alternative Medicine',
 'Amateur Sports Teams',
 'American (New)',
 'American (Traditional)',
 'Amusement Parks',
 'Animal Assisted Therapy',
 'Animal Physical Therapy',
 'Animal Shelters',
 'Antiques',
 'Apartments',
 'Appliances',
 'Appliances & Repair',
 'Aquarium Services',
 'Aquariums',
 'Arabian',
 'Arcades',
 'Archery',
 'Argentine',
 'Armenian',
 'Art Classes',
 'Art Galleries',
 'Art Museums',
 'Art Schools',
 'Art Supplies',
 'Arts & Crafts',
 'Arts & Entertainment',
 'Asian Fusion',
 'Audio/Visual Equipment Rental',
 'Australian',
 'Austrian',
 'Auto Customization',
 'Auto Detailing',
 'Auto Glass Services',
 'Auto Insurance',
 'Auto Parts & Supplies',

In [31]:
weird = restBizDF.filter(array_contains(col('categoriesList'), 'Acne Treatment'))
weird.select('name', 'categoriesList').show(10)

+---------------+--------------------+
|           name|      categoriesList|
+---------------+--------------------+
|Canada MedLaser|[Laser Hair Remov...|
+---------------+--------------------+



In [32]:
restCat_count = Counter(restCatList_flat).most_common()

In [33]:
restCat_count

[('Restaurants', 59371),
 ('Food', 14800),
 ('Nightlife', 8562),
 ('Bars', 8182),
 ('Sandwiches', 7332),
 ('Fast Food', 7257),
 ('American (Traditional)', 7107),
 ('Pizza', 6804),
 ('Burgers', 5404),
 ('Breakfast & Brunch', 5381),
 ('American (New)', 4882),
 ('Italian', 4716),
 ('Mexican', 4618),
 ('Chinese', 4428),
 ('Coffee & Tea', 3647),
 ('Cafes', 3232),
 ('Japanese', 2716),
 ('Chicken Wings', 2705),
 ('Event Planning & Services', 2685),
 ('Salad', 2531),
 ('Seafood', 2508),
 ('Sushi Bars', 2258),
 ('Specialty Food', 2091),
 ('Delis', 1955),
 ('Asian Fusion', 1953),
 ('Canadian (New)', 1909),
 ('Mediterranean', 1834),
 ('Bakeries', 1833),
 ('Caterers', 1829),
 ('Barbeque', 1814),
 ('Sports Bars', 1811),
 ('Desserts', 1725),
 ('Steakhouses', 1603),
 ('Pubs', 1491),
 ('Indian', 1489),
 ('Thai', 1449),
 ('Diners', 1433),
 ('Middle Eastern', 1317),
 ('Vietnamese', 1286),
 ('Beer', 1135),
 ('Wine & Spirits', 1135),
 ('Vegetarian', 1123),
 ('Greek', 1086),
 ('Arts & Entertainment', 1037)

In [34]:
#for simplicity lets first remove all where there are less than 6 instance of that category
drop_cat = [x[0] for x in restCat_count if x[1] < 99 ]

In [35]:
drop_cat

['Automotive',
 'Golf',
 'Local Services',
 'Arcades',
 'Patisserie/Cake Shop',
 'Brazilian',
 'Do-It-Yourself Food',
 'Delicatessen',
 'Pan Asian',
 'Irish Pub',
 'Seafood Markets',
 'Home & Garden',
 'Coffee Roasteries',
 'Ethiopian',
 'Flowers & Gifts',
 'Butcher',
 'Shopping Centers',
 'Farmers Market',
 'Jazz & Blues',
 'Polish',
 'Kebab',
 'Live/Raw Food',
 'Pool Halls',
 'Cuban',
 'Home Services',
 'Moroccan',
 'Fruits & Veggies',
 'Malaysian',
 'Cheese Shops',
 'Health & Medical',
 'Organic Stores',
 'Pasta Shops',
 'Fashion',
 'Salvadoran',
 'Shaved Ice',
 'Arabian',
 'Education',
 'Chocolatiers & Shops',
 'International Grocery',
 'Custom Cakes',
 'Cupcakes',
 'Himalayan/Nepalese',
 'Russian',
 'Gas Stations',
 'Gelato',
 'Smokehouse',
 'Donairs',
 'Bed & Breakfast',
 'New Mexican Cuisine',
 'Acai Bowls',
 'Performing Arts',
 'Bowling',
 'Professional Services',
 'Belgian',
 'Mongolian',
 'Fondue',
 'Beer Gardens',
 'Art Galleries',
 'Day Spas',
 'Colombian',
 'Arts & Crafts'

In [36]:
len(drop_cat)

627

In [37]:
restBizDF_keep = restBizDF

for cat in drop_cat:
    restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), cat)))

In [38]:
restCatListKeep_flat = flatten_cats(restBizDF_keep, 'categoriesList')
keep_unique = set(restCatListKeep_flat)
len(keep_unique)

134

In [39]:
Counter(restCatListKeep_flat).most_common()

[('Restaurants', 54002),
 ('Food', 12250),
 ('Nightlife', 7385),
 ('Bars', 7142),
 ('Fast Food', 7094),
 ('Sandwiches', 6832),
 ('American (Traditional)', 6553),
 ('Pizza', 6502),
 ('Burgers', 5228),
 ('Breakfast & Brunch', 4903),
 ('Italian', 4472),
 ('American (New)', 4424),
 ('Mexican', 4383),
 ('Chinese', 4174),
 ('Coffee & Tea', 3176),
 ('Cafes', 2688),
 ('Chicken Wings', 2604),
 ('Japanese', 2566),
 ('Salad', 2312),
 ('Seafood', 2294),
 ('Sushi Bars', 2151),
 ('Event Planning & Services', 2078),
 ('Canadian (New)', 1749),
 ('Asian Fusion', 1719),
 ('Delis', 1707),
 ('Barbeque', 1650),
 ('Sports Bars', 1634),
 ('Mediterranean', 1604),
 ('Caterers', 1591),
 ('Steakhouses', 1446),
 ('Bakeries', 1442),
 ('Desserts', 1381),
 ('Diners', 1379),
 ('Indian', 1342),
 ('Specialty Food', 1326),
 ('Thai', 1325),
 ('Pubs', 1304),
 ('Vietnamese', 1230),
 ('Middle Eastern', 1132),
 ('Greek', 1033),
 ('Beer', 960),
 ('Wine & Spirits', 960),
 ('Vegetarian', 945),
 ('French', 925),
 ('Korean', 888)

In [40]:
#select only reviews for restaurants 
#restIDList = restBizDF_keep.select('business_id').rdd.flatMap(lambda x: x).collect()
#restReviewDF = reviewDF.where(reviewDF.business_id.isin(restIDList))

In [41]:
#print('# of restaurant reviews: ', restReviewDF.count())
#Result : 3712643

In [42]:
#filter out grocery stores
restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), 'Convenience Stores')))
restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), 'Grocery')))
restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), 'Casinos')))
restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), 'Hotels & Travel')))
restBizDF_keep = restBizDF_keep.filter(~(array_contains(col('categoriesList'), 'Shopping')))
#restBizDF_keep.select('name','categoriesList','attributes','state').show(20)

In [43]:
#temp = restBizDF_keep.filter(array_contains(col('categoriesList'), 'Shopping'))
#temp.select('name','categoriesList','attributes','state').show()

In [44]:
restCatListKeep_flat = flatten_cats(restBizDF_keep, 'categoriesList')
keep_unique = set(restCatListKeep_flat)
len(keep_unique)

128

In [45]:
Counter(restCatListKeep_flat).most_common()

[('Restaurants', 53324),
 ('Food', 11806),
 ('Nightlife', 7265),
 ('Fast Food', 7080),
 ('Bars', 7028),
 ('Sandwiches', 6751),
 ('Pizza', 6471),
 ('American (Traditional)', 6459),
 ('Burgers', 5206),
 ('Breakfast & Brunch', 4845),
 ('Italian', 4432),
 ('American (New)', 4358),
 ('Mexican', 4341),
 ('Chinese', 4158),
 ('Coffee & Tea', 3137),
 ('Cafes', 2649),
 ('Chicken Wings', 2598),
 ('Japanese', 2559),
 ('Salad', 2296),
 ('Seafood', 2272),
 ('Sushi Bars', 2142),
 ('Event Planning & Services', 1861),
 ('Canadian (New)', 1721),
 ('Asian Fusion', 1712),
 ('Barbeque', 1640),
 ('Sports Bars', 1614),
 ('Mediterranean', 1577),
 ('Delis', 1570),
 ('Caterers', 1548),
 ('Steakhouses', 1421),
 ('Bakeries', 1383),
 ('Desserts', 1367),
 ('Diners', 1366),
 ('Indian', 1326),
 ('Thai', 1320),
 ('Pubs', 1284),
 ('Specialty Food', 1224),
 ('Vietnamese', 1222),
 ('Middle Eastern', 1114),
 ('Greek', 1024),
 ('Vegetarian', 933),
 ('French', 915),
 ('Beer', 907),
 ('Wine & Spirits', 907),
 ('Korean', 877)

In [46]:
restIDList = restBizDF_keep.select('business_id').rdd.flatMap(lambda x: x).collect()
restReviewDF = reviewDF.where(reviewDF.business_id.isin(restIDList))

In [47]:
print('# of restaurant reviews: ', restReviewDF.count())
#Result : 3643450

# of restaurant reviews:  3643450


In [48]:
restReviewDF.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [49]:
restReviewDF = restReviewDF[['business_id', 'date', 'review_id', 'stars', 'text', 'useful']]

In [50]:
restBizDF_keep.write.json('../SavedFiles/restBiz.json')

In [51]:
restReviewDF.write.json('../SavedFiles/restReview.json')