# Setup

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('Yelp! Data Exploration')\
  .getOrCreate()

In [3]:
# Directory where the raw data is stored
data_folder = "gs://ca4022_yelp_data/data_raw/"

# Processing

## Cleaning the Business Table

In [3]:
df_bus = spark.read.csv(data_folder + "business.csv", header=True, sep= ",", inferSchema=True)

Inspecting the data...

In [4]:
df_bus.show(10, truncate=False)

+----------------------+----------------------------+----------------------------+---------------+-----+-----------+-------------+--------------+-----+------------+-------+------------------------------------------------------------------------------------------------+
|business_id           |name                        |address                     |city           |state|postal_code|latitude     |longitude     |stars|review_count|is_open|categories                                                                                      |
+----------------------+----------------------------+----------------------------+---------------+-----+-----------+-------------+--------------+-----+------------+-------+------------------------------------------------------------------------------------------------+
|f9NumwFMBDn751xgFiRbNA|The Range At Lake Norman    |10913 Bailey Rd             |Cornelius      |NC   |28031      |35.4627242   |-80.8526119   |3.5  |36.0        |1.0    |Active Life, Gun/R

This is not very readable so we employ the toPandas() function available in PySpark to display the Spark DataFrame as a pandas DataFrame

In [6]:
df_bus.limit(10).toPandas()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories
0,f9NumwFMBDn751xgFiRbNA,The Range At Lake Norman,10913 Bailey Rd,Cornelius,NC,28031,35.4627242,-80.8526119,3.5,36.0,1.0,"Active Life, Gun/Rifle Ranges, Guns & Ammo, Sh..."
1,Yzvjg0SayhoZgCljUJRF9Q,"Carlos Santo, NMD","8880 E Via Linda, Ste 107",Scottsdale,AZ,85258,33.5694041,-111.8902637,5.0,4.0,1.0,"Health & Medical, Fitness & Instruction, Yoga,..."
2,XNoUzKckATkOD1hP6vghZg,Felinus,3554 Rue Notre-Dame O,Montreal,QC,H4C 1P4,45.479984,-73.58007,5.0,5.0,1.0,"Pets, Pet Services, Pet Groomers"
3,6OAZjbxqM5ol29BuHsil3w,Nevada House of Hose,1015 Sharp Cir,North Las Vegas,NV,89030,36.2197281,-115.1277255,2.5,3.0,0.0,"Hardware Stores, Home Services, Building Suppl..."
4,51M2Kk903DFYI6gnB5I6SQ,USE MY GUY SERVICES LLC,4827 E Downing Cir,Mesa,AZ,85205,33.4280652,-111.7266485,4.5,26.0,1.0,"Home Services, Plumbing, Electricians, Handyma..."
5,cKyLV5oWZJ2NudWgqs8VZw,Oasis Auto Center - Gilbert,"1720 W Elliot Rd, Ste 105",Gilbert,AZ,85233,33.3503993,-111.8271417,4.5,38.0,1.0,"Auto Repair, Automotive, Oil Change Stations, ..."
6,oiAlXZPIFm2nBCt0DHLu_Q,Green World Cleaners,"6870 S Rainbow Blvd, Ste 117",Las Vegas,NV,89118,36.0639767,-115.241463,3.5,81.0,1.0,"Dry Cleaning & Laundry, Local Services, Laundr..."
7,ScYkbYNkDgCneBrD9vqhCQ,Junction Tire & Auto Service,6910 E Southern Ave,Mesa,AZ,85209,33.3938847,-111.6822257,5.0,18.0,1.0,"Auto Repair, Oil Change Stations, Automotive, ..."
8,pQeaRpvuhoEqudo3uymHIQ,The Empanadas House,404 E Green St,Champaign,IL,61820,40.1104457,-88.2330726,4.5,5.0,1.0,"Ethnic Food, Food Trucks, Specialty Food, Impo..."
9,EosRKXIGeSWFYWwpkbhNnA,Xtreme Couture,700 Kipling Avenue Etobicoke,Toronto,ON,M8Z 5G3,43.6245394916,-79.5291079302,3.0,16.0,1.0,"Martial Arts, Gyms, Fitness & Instruction, Act..."


By quickly looking at a small sample of the data, we can see that it appears relatively clean but more analysis is needed to confirm this. Let's check the schema of the data...

In [7]:
df_bus.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- review_count: double (nullable = true)
 |-- is_open: double (nullable = true)
 |-- categories: string (nullable = true)



We can see from the schema that most of the columns are strings and the majority of these are fine but arguably, the lat, long and average star rating should be doubles.

In [8]:
# How many rows are in the business table?
df_bus.count()

209393

To determine if we can convert the data in these columns to doubles, we must test how many non-null values are present when we cast the data in each of the columns to double.

In [10]:
# Determine how many rows in each column convert properly?
print("Latitude: ", df_bus.filter(F.col("latitude").cast("double").isNotNull()).count())
print("Longitude: ", df_bus.filter(F.col("longitude").cast("double").isNotNull()).count())
print("Stars: ", df_bus.filter(F.col("stars").cast("double").isNotNull()).count())

Latitude:  209391
Longitude:  209392
Stars:  209392


So there are a very small number of rows which don't convert properly, let's examine these...

In [12]:
df_bus.filter(F.col("latitude").cast("double").isNull()).toPandas()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories
0,wUKqPYvtBwrNk-QHgPtaxg,McDonald's,"""30 Brisdale Rd, Bldg """"C""""","Building C""",Brampton,ON,L7A 3G1,43.6821784,-79.8138718,3.0,8.0,1.0
1,P132nWdSsbJn5UyOwm9uAg,"""Arash Giancarlo """"Gianni"""" Vishteh",MD,PC,FACS,"FAANS""","18404 N Tatum Blvd, Unit 205",Phoenix,AZ,85032.0,33.65407,-111.9784252


In [13]:
df_bus.filter(F.col("longitude").cast("double").isNull()).toPandas()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories
0,P132nWdSsbJn5UyOwm9uAg,"""Arash Giancarlo """"Gianni"""" Vishteh",MD,PC,FACS,"FAANS""","18404 N Tatum Blvd, Unit 205",Phoenix,AZ,85032.0,33.65407,-111.9784252


Same record as above

In [14]:
df_bus.filter(F.col("stars").cast("double").isNull()).toPandas()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories
0,P132nWdSsbJn5UyOwm9uAg,"""Arash Giancarlo """"Gianni"""" Vishteh",MD,PC,FACS,"FAANS""","18404 N Tatum Blvd, Unit 205",Phoenix,AZ,85032.0,33.65407,-111.9784252


So, we will filter out both of these rows...

In [15]:
df_bus = df_bus.filter(F.col("latitude").cast("double").isNotNull())

In [17]:
df_bus.count()

209391

Now, we can change the type of each column to double..

In [18]:
from pyspark.sql.types import DoubleType

df_bus = df_bus.withColumn("latitude", df_bus["latitude"].cast("double"))
df_bus = df_bus.withColumn("longitude", df_bus["longitude"].cast("double"))
df_bus = df_bus.withColumn("stars", df_bus["stars"].cast("double"))

In [20]:
df_bus.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- is_open: double (nullable = true)
 |-- categories: string (nullable = true)



In [24]:
df_bus.count()

209391

In [28]:
# Writing the cleaned business DataFrame to our cloud storage bucket
df_bus.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/business.csv")

In [29]:
df = spark.read.csv("gs://ca4022_yelp_data/data_clean/business.csv", header=True, sep= ",", inferSchema=True)

In [30]:
df.limit(10).toPandas()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories
0,f9NumwFMBDn751xgFiRbNA,The Range At Lake Norman,10913 Bailey Rd,Cornelius,NC,28031,35.462724,-80.852612,3.5,36.0,1.0,"Active Life, Gun/Rifle Ranges, Guns & Ammo, Sh..."
1,Yzvjg0SayhoZgCljUJRF9Q,"Carlos Santo, NMD","8880 E Via Linda, Ste 107",Scottsdale,AZ,85258,33.569404,-111.890264,5.0,4.0,1.0,"Health & Medical, Fitness & Instruction, Yoga,..."
2,XNoUzKckATkOD1hP6vghZg,Felinus,3554 Rue Notre-Dame O,Montreal,QC,H4C 1P4,45.479984,-73.58007,5.0,5.0,1.0,"Pets, Pet Services, Pet Groomers"
3,6OAZjbxqM5ol29BuHsil3w,Nevada House of Hose,1015 Sharp Cir,North Las Vegas,NV,89030,36.219728,-115.127725,2.5,3.0,0.0,"Hardware Stores, Home Services, Building Suppl..."
4,51M2Kk903DFYI6gnB5I6SQ,USE MY GUY SERVICES LLC,4827 E Downing Cir,Mesa,AZ,85205,33.428065,-111.726648,4.5,26.0,1.0,"Home Services, Plumbing, Electricians, Handyma..."
5,cKyLV5oWZJ2NudWgqs8VZw,Oasis Auto Center - Gilbert,"1720 W Elliot Rd, Ste 105",Gilbert,AZ,85233,33.350399,-111.827142,4.5,38.0,1.0,"Auto Repair, Automotive, Oil Change Stations, ..."
6,oiAlXZPIFm2nBCt0DHLu_Q,Green World Cleaners,"6870 S Rainbow Blvd, Ste 117",Las Vegas,NV,89118,36.063977,-115.241463,3.5,81.0,1.0,"Dry Cleaning & Laundry, Local Services, Laundr..."
7,ScYkbYNkDgCneBrD9vqhCQ,Junction Tire & Auto Service,6910 E Southern Ave,Mesa,AZ,85209,33.393885,-111.682226,5.0,18.0,1.0,"Auto Repair, Oil Change Stations, Automotive, ..."
8,pQeaRpvuhoEqudo3uymHIQ,The Empanadas House,404 E Green St,Champaign,IL,61820,40.110446,-88.233073,4.5,5.0,1.0,"Ethnic Food, Food Trucks, Specialty Food, Impo..."
9,EosRKXIGeSWFYWwpkbhNnA,Xtreme Couture,700 Kipling Avenue Etobicoke,Toronto,ON,M8Z 5G3,43.624539,-79.529108,3.0,16.0,1.0,"Martial Arts, Gyms, Fitness & Instruction, Act..."


## Cleaning the Business Hours Table

**Note:** Minimal commentary is provided for the remainder of this notebook as a lot of repeated steps are carried out.

In [31]:
df_bus_hrs = spark.read.csv(data_folder + "business_hours.csv", header=True, sep= ",", inferSchema=True)

Inspecting the data...

In [32]:
df_bus_hrs.limit(10).toPandas()

Unnamed: 0,business_id,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday
0,f9NumwFMBDn751xgFiRbNA,10:0-18:0,11:0-20:0,10:0-18:0,11:0-20:0,11:0-20:0,11:0-20:0,13:0-18:0
1,6OAZjbxqM5ol29BuHsil3w,7:0-18:0,7:0-18:0,7:0-18:0,7:0-18:0,7:0-18:0,7:0-15:0,
2,51M2Kk903DFYI6gnB5I6SQ,7:0-19:0,7:0-19:0,7:0-19:0,7:0-19:0,7:0-19:0,9:0-17:0,
3,cKyLV5oWZJ2NudWgqs8VZw,7:30-17:0,7:30-17:0,7:30-17:0,7:30-17:0,7:30-17:0,7:30-13:0,
4,oiAlXZPIFm2nBCt0DHLu_Q,11:30-14:30,11:30-14:30,11:30-14:30,11:30-14:30,11:30-14:30,,
5,ScYkbYNkDgCneBrD9vqhCQ,5:30-23:0,5:30-23:0,5:30-23:0,5:30-23:0,5:30-23:0,8:0-19:0,8:0-17:0
6,pQeaRpvuhoEqudo3uymHIQ,7:0-17:0,7:0-17:0,7:0-17:0,7:0-17:0,7:0-17:0,7:0-17:0,
7,EosRKXIGeSWFYWwpkbhNnA,8:0-17:0,8:0-17:0,,8:0-17:0,8:0-17:0,8:0-17:0,
8,MbZMmwo-eL0Jnm_Yb9KJrA,0:0-0:0,11:0-21:0,11:0-21:0,11:0-21:0,11:0-22:0,11:0-22:0,11:0-21:0
9,7Dv4_HAxsxvadEsT5fxQBg,9:0-16:0,9:0-16:0,9:0-16:0,9:0-16:0,9:0-16:0,,


In [33]:
df_bus_hrs.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- Monday: string (nullable = true)
 |-- Tuesday: string (nullable = true)
 |-- Wednesday: string (nullable = true)
 |-- Thursday: string (nullable = true)
 |-- Friday: string (nullable = true)
 |-- Saturday: string (nullable = true)
 |-- Sunday: string (nullable = true)



In [34]:
# How many rows are in the business hours table?
df_bus_hrs.count()

164550

So information on the opening hours are not present for every business; only approximately 160k out of about 210k, but this is fine. Write DataFrane to new CSV file in our Cloud Storage Bucket.

In [35]:
# Writing the cleaned business DataFrame to our cloud storage bucket
df_bus_hrs.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/business_hours.csv")

# Cleaning the Business Attributes Table

In [37]:
df_bus_attr = spark.read.csv(data_folder + "business_attributes.csv", header=True, sep= ",", inferSchema=True)

Inspecting the data...

In [38]:
df_bus_attr.limit(10).toPandas()

Unnamed: 0,business_id,BusinessAcceptsCreditCards,BikeParking,GoodForKids,BusinessParking,ByAppointmentOnly,RestaurantsPriceRange2,DogsAllowed,WiFi,RestaurantsAttire,...,BestNights,Music,Smoking,DietaryRestrictions,DriveThru,HairSpecializesIn,BYOBCorkage,AgesAllowed,RestaurantsCounterService,Open24Hours
0,f9NumwFMBDn751xgFiRbNA,True,True,False,"{'garage': False, 'street': False, 'validated'...",False,3.0,,,,...,,,,,,,,,,
1,Yzvjg0SayhoZgCljUJRF9Q,,,True,,True,,,,,...,,,,,,,,,,
2,6OAZjbxqM5ol29BuHsil3w,True,,,,True,,,,,...,,,,,,,,,,
3,51M2Kk903DFYI6gnB5I6SQ,True,,,,,,,,,...,,,,,,,,,,
4,cKyLV5oWZJ2NudWgqs8VZw,True,True,,"{'garage': False, 'street': False, 'validated'...",,1.0,,u'no',,...,,,,,,,,,,
5,oiAlXZPIFm2nBCt0DHLu_Q,True,,,,False,,,,,...,,,,,,,,,,
6,ScYkbYNkDgCneBrD9vqhCQ,True,False,False,,False,1.0,,,u'casual',...,,,,,,,,,,
7,pQeaRpvuhoEqudo3uymHIQ,,False,True,"{'garage': False, 'street': False, 'validated'...",True,,,,,...,,,,,,,,,,
8,EosRKXIGeSWFYWwpkbhNnA,True,,,,,,,,,...,,,,,,,,,,
9,7Dv4_HAxsxvadEsT5fxQBg,True,,,,True,,,,,...,,,,,,,,,,


In [41]:
# How many rows are in the business table?
df_bus_attr.count()

180348

By quickly looking at a small sample of the data, we can see that it appears relatively clean but more analysis is needed to confirm this. Let's check the schema of the data...

In [39]:
df_bus_attr.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- BusinessAcceptsCreditCards: string (nullable = true)
 |-- BikeParking: string (nullable = true)
 |-- GoodForKids: string (nullable = true)
 |-- BusinessParking: string (nullable = true)
 |-- ByAppointmentOnly: string (nullable = true)
 |-- RestaurantsPriceRange2: string (nullable = true)
 |-- DogsAllowed: string (nullable = true)
 |-- WiFi: string (nullable = true)
 |-- RestaurantsAttire: string (nullable = true)
 |-- RestaurantsTakeOut: string (nullable = true)
 |-- NoiseLevel: string (nullable = true)
 |-- RestaurantsReservations: string (nullable = true)
 |-- RestaurantsGoodForGroups: string (nullable = true)
 |-- HasTV: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- RestaurantsDelivery: string (nullable = true)
 |-- OutdoorSeating: string (nullable = true)
 |-- Caters: string (nullable = true)
 |-- WheelchairAccessible: string (nullable = true)
 |-- AcceptsInsurance: string (nullable = true)
 |-- Restaurants

In [42]:
df_bus_attr.columns

['business_id',
 'BusinessAcceptsCreditCards',
 'BikeParking',
 'GoodForKids',
 'BusinessParking',
 'ByAppointmentOnly',
 'RestaurantsPriceRange2',
 'DogsAllowed',
 'WiFi',
 'RestaurantsAttire',
 'RestaurantsTakeOut',
 'NoiseLevel',
 'RestaurantsReservations',
 'RestaurantsGoodForGroups',
 'HasTV',
 'Alcohol',
 'RestaurantsDelivery',
 'OutdoorSeating',
 'Caters',
 'WheelchairAccessible',
 'AcceptsInsurance',
 'RestaurantsTableService',
 'Ambience',
 'GoodForMeal',
 'HappyHour',
 'BusinessAcceptsBitcoin',
 'BYOB',
 'Corkage',
 'GoodForDancing',
 'CoatCheck',
 'BestNights',
 'Music',
 'Smoking',
 'DietaryRestrictions',
 'DriveThru',
 'HairSpecializesIn',
 'BYOBCorkage',
 'AgesAllowed',
 'RestaurantsCounterService',
 'Open24Hours']

We can see from the schema that most of the columns are string data types and the majority should be boolean types. To determine if we can convert the data in these columns to boolean, we must test how many non-null values are present when we cast the data in each of the columns to boolean. First we need to see how many non-null values are each column before testing the conversion to boolean.

In [44]:
# Determine how many rows in each column are non-null?
print("Total rows: ", df_bus_attr.count())

for col in df_bus_attr.columns:
    print(f"{col}: ", df_bus_attr.filter(F.col(f"{col}").isNotNull()).count())

Total rows:  180348
business_id:  180348
BusinessAcceptsCreditCards:  105261
BikeParking:  77428
GoodForKids:  59097
BusinessParking:  99288
ByAppointmentOnly:  52166
RestaurantsPriceRange2:  95931
DogsAllowed:  15088
WiFi:  56298
RestaurantsAttire:  42672
RestaurantsTakeOut:  57057
NoiseLevel:  40078
RestaurantsReservations:  47627
RestaurantsGoodForGroups:  48398
HasTV:  46035
Alcohol:  43770
RestaurantsDelivery:  48805
OutdoorSeating:  50425
Caters:  37792
WheelchairAccessible:  24628
AcceptsInsurance:  7438
RestaurantsTableService:  17873
Ambience:  46372
GoodForMeal:  30298
HappyHour:  13205
BusinessAcceptsBitcoin:  14262
BYOB:  634
Corkage:  934
GoodForDancing:  4487
CoatCheck:  4261
BestNights:  4730
Music:  6846
Smoking:  3690
DietaryRestrictions:  52
DriveThru:  3677
HairSpecializesIn:  1081
BYOBCorkage:  1213
AgesAllowed:  123
RestaurantsCounterService:  9
Open24Hours:  11


In [46]:
from pyspark.sql.types import BooleanType

# Determine how many rows in each column convert properly? For ease, we will attempt to convert all
print("Total rows: ", df_bus_attr.count())

for col in df_bus_attr.columns:
    print(f"{col}: ", df_bus_attr.filter(F.col(f"{col}").cast("boolean").isNotNull()).count())

Total rows:  180348
business_id:  0
BusinessAcceptsCreditCards:  105183
BikeParking:  77249
GoodForKids:  59029
BusinessParking:  0
ByAppointmentOnly:  52088
RestaurantsPriceRange2:  30485
DogsAllowed:  15056
WiFi:  0
RestaurantsAttire:  0
RestaurantsTakeOut:  56877
NoiseLevel:  0
RestaurantsReservations:  47345
RestaurantsGoodForGroups:  48350
HasTV:  45993
Alcohol:  0
RestaurantsDelivery:  47976
OutdoorSeating:  50364
Caters:  37673
WheelchairAccessible:  24594
AcceptsInsurance:  7425
RestaurantsTableService:  17844
Ambience:  0
GoodForMeal:  0
HappyHour:  13201
BusinessAcceptsBitcoin:  14259
BYOB:  633
Corkage:  930
GoodForDancing:  4481
CoatCheck:  4256
BestNights:  0
Music:  0
Smoking:  0
DietaryRestrictions:  0
DriveThru:  3673
HairSpecializesIn:  0
BYOBCorkage:  0
AgesAllowed:  0
RestaurantsCounterService:  9
Open24Hours:  11


So there are a quite a few rows in each which don't convert properly. In the interest of time and usefulness, we are not going to convert these at the moment. If a specific need arises during our analysis to convert these, we can return to this notebook and do so.

In [47]:
# Writing the cleaned business attributes DataFrame to our cloud storage bucket
df_bus_attr.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/business_attributes.csv")

## Cleaning the Checkin Table

In [48]:
df_checkin = spark.read.csv(data_folder + "checkin.csv", header=True, sep= ",", inferSchema=True)

Inspecting the data...

In [62]:
df_checkin.limit(5).toPandas()

Unnamed: 0,business_id,date
0,--1UhMGODdWsrMastO9DZw,"2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016..."
1,--6MefnULPED_I942VcFNA,"2011-06-04 18:22:23, 2011-07-23 23:51:33, 2012..."
2,--7zmmkVg-IMGaXbuVd0SQ,"2014-12-29 19:25:50, 2015-01-17 01:49:14, 2015..."
3,--8LPVSo5i0Oo61X01sV9A,2016-07-08 16:43:30
4,--9QQLMTbFzLJ_oT-ON3Xw,"2010-06-26 17:39:07, 2010-08-01 20:06:21, 2010..."


By quickly looking at a small sample of the data, each checkin timestamp (format: yyyy-mm-dd hh:mm:ss) is stored in the date column in a comma-seperated list. There are varying amounts of checkins for each business so extracting these would be difficult from a data design perspective. In the interest of time, we will leave these as is unless a specific need arises for us to extract them.

In [52]:
df_checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [53]:
# How many rows are in the checkin table?
df_checkin.count()

175187

Let's first see if there are any NULL values in the date column...

In [54]:
df_checkin.filter(F.col("date").isNull()).count()

0

In [65]:
# Writing the cleaned checkin DataFrame to our cloud storage bucket
df_checkin.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/checkin.csv")

## Cleaning the Tip Table

In [4]:
df_tip = spark.read.csv(data_folder + "tip.csv", header=True, sep= ",", inferSchema=True)

Inspecting the data...

In [5]:
df_tip.limit(10).toPandas()

Unnamed: 0,user_id,business_id,text,date,compliment_count
0,hf27xTME3EiCp6NL6VtWZQ,UYX5zL_Xj9WEc_Wp-FrqHw,Here for a quick mtg,2013-11-26 18:20:08,0
1,uEvusDwoSymbJJ0auR3muQ,Ch3HkwQYv1YKw_FO06vBWA,Cucumber strawberry refresher,2014-06-15 22:26:45,0
2,AY-laIws3S7YXNl_f_D6rQ,rDoT-MgxGRiYqCmi0bG10g,Very nice good service good food,2016-07-18 22:03:42,0
3,Ue_7yUlkEbX4AhnYdUfL7g,OHXnDV01gLokiX1ELaQufA,It's a small place. The staff is friendly.,2014-06-06 01:10:34,0
4,LltbT_fUMqZ-ZJP-vJ84IQ,GMrwDXRlAZU2zj5nH6l4vQ,"8 sandwiches, $24 total...what a bargain!!! An...",2011-04-08 18:12:01,0
5,HHNBqfbDR8b1iq-QGxu8ww,ALwAlxItASeEs2vYAeLXHA,Great ramen! Not only is the presentation gorg...,2015-05-20 20:17:38,0
6,r0j4IpUbcdC1-HfoMYae4w,d_L-rfS1vT3JMzgCUGtiow,Cochinita Pibil was memorable & delicious !,2014-09-01 01:23:48,0
7,gxVQZJVeKBUk7jEhSyqv-A,5FIOXmUE3qMviX9GafGH-Q,Get a tsoynami for sure.,2010-01-30 02:03:16,0
8,2hdR7KYAmnCk2FjTnPFsuw,rcaPajgKOJC2vo_l3xa42A,Kelly is an awesome waitress there!,2012-05-29 02:05:56,0
9,DsWg3leomfasGs3j0rOfbQ,hfBrethLHS9iXeBNR8MtzQ,Check out the great assortment of organic & co...,2011-09-30 18:38:47,0


By quickly looking at a small sample of the data, we can see that it appears relatively clean but more analysis is needed to confirm this. Let's check the schema of the data...

In [6]:
df_tip.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- compliment_count: string (nullable = true)



We can see from the schema that all of the columns are strings and the majority of these are fine but the compliment_count column should be an integer type. It is also probably preferable to convert the date column to an actual timestamp data type.

In [8]:
# How many rows are in the tip table?
df_tip.count()

1363162

To determine if we can convert the data in these columns, we must test how many non-null values are present when we cast the data in each of the columns.

In [9]:
# Determine how many rows in each column convert properly?
print("Date: ", df_tip.filter(to_timestamp(df_tip.date, 'yyyy-MM-dd HH:mm:ss').isNotNull()).count())
print("Compliment Count: ", df_tip.filter(F.col("compliment_count").cast("int").isNotNull()).count())

Date:  1289856
Compliment Count:  1289859


So there are a lot of rows which don't convert properly, let's examine these...

In [10]:
# Rows where the date column doesn't convert properly
df_tip.filter(to_timestamp(df_tip.date, 'yyyy-MM-dd HH:mm:ss').isNull()).limit(10).toPandas()

Unnamed: 0,user_id,business_id,text,date,compliment_count
0,Cg189N5utFD_4VxXImPZjg,MyTsAeShB9pcWP84GSTAdw,1. Kimchi salsa,,
1,2. Bulgogi burrito,,,,
2,"3. Go!""",2013-12-22 21:53:23,0,,
3,0sl4I8R7znWXp_K4A0nZ_w,VgSF_jQ8St7A2DhdafRDCQ,"""Affectionately called """"The Queue"""" by locals",and home to the 2015-16 NBA Champion Clevelan...,2017-05-10 00:26:31
4,-vfjOBjztq9rOtOK7YMVXQ,l-Zjik0MbpabQPv-nNp9YQ,Street parking went early took home apple frit...,,
5,Took a Huge cold cut sandwich and Greek salad ...,great bread fresh,,,
6,Next time porchetta sandwich,,,,
7,Guess what porchetta in assorted with cracking...,2014-03-15 16:40:21,0,,
8,JX7mjMODvt96vQIj9ENzhw,hTq7pFDjS0o3KhheKGkvBw,I have intel telling me they really do what th...,,
9,Be kind and don't allow your bad attitude to t...,2016-12-13 17:17:54,0,,


In [11]:
# Rows where the compliment_count column doesn't convert properly
df_tip.filter(F.col("compliment_count").cast("int").isNull()).limit(10).toPandas()

Unnamed: 0,user_id,business_id,text,date,compliment_count
0,Cg189N5utFD_4VxXImPZjg,MyTsAeShB9pcWP84GSTAdw,1. Kimchi salsa,,
1,2. Bulgogi burrito,,,,
2,"3. Go!""",2013-12-22 21:53:23,0,,
3,0sl4I8R7znWXp_K4A0nZ_w,VgSF_jQ8St7A2DhdafRDCQ,"""Affectionately called """"The Queue"""" by locals",and home to the 2015-16 NBA Champion Clevelan...,2017-05-10 00:26:31
4,-vfjOBjztq9rOtOK7YMVXQ,l-Zjik0MbpabQPv-nNp9YQ,Street parking went early took home apple frit...,,
5,Took a Huge cold cut sandwich and Greek salad ...,great bread fresh,,,
6,Next time porchetta sandwich,,,,
7,Guess what porchetta in assorted with cracking...,2014-03-15 16:40:21,0,,
8,JX7mjMODvt96vQIj9ENzhw,hTq7pFDjS0o3KhheKGkvBw,I have intel telling me they really do what th...,,
9,Be kind and don't allow your bad attitude to t...,2016-12-13 17:17:54,0,,


Clearly, these erroneous rows are caused by incorrect parsing of the CSV file from which they are read. In the interest of time, we will filter out all of these rows...

In [12]:
df_tip = df_tip.filter(to_timestamp(df_tip.date, 'yyyy-MM-dd HH:mm:ss').isNotNull())

In [13]:
df_tip.count()

1289856

Now, we can change the type of each column...

In [14]:
df_tip = df_tip.withColumn("date", to_timestamp(df_tip.date, 'yyyy-MM-dd HH:mm:ss'))
df_tip = df_tip.withColumn("compliment_count", df_tip["compliment_count"].cast("int"))

In [15]:
df_tip.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- compliment_count: integer (nullable = true)



In [18]:
df_tip.limit(10).toPandas()

Unnamed: 0,user_id,business_id,text,date,compliment_count
0,hf27xTME3EiCp6NL6VtWZQ,UYX5zL_Xj9WEc_Wp-FrqHw,Here for a quick mtg,2013-11-26 18:20:08,0
1,uEvusDwoSymbJJ0auR3muQ,Ch3HkwQYv1YKw_FO06vBWA,Cucumber strawberry refresher,2014-06-15 22:26:45,0
2,AY-laIws3S7YXNl_f_D6rQ,rDoT-MgxGRiYqCmi0bG10g,Very nice good service good food,2016-07-18 22:03:42,0
3,Ue_7yUlkEbX4AhnYdUfL7g,OHXnDV01gLokiX1ELaQufA,It's a small place. The staff is friendly.,2014-06-06 01:10:34,0
4,LltbT_fUMqZ-ZJP-vJ84IQ,GMrwDXRlAZU2zj5nH6l4vQ,"8 sandwiches, $24 total...what a bargain!!! An...",2011-04-08 18:12:01,0
5,HHNBqfbDR8b1iq-QGxu8ww,ALwAlxItASeEs2vYAeLXHA,Great ramen! Not only is the presentation gorg...,2015-05-20 20:17:38,0
6,r0j4IpUbcdC1-HfoMYae4w,d_L-rfS1vT3JMzgCUGtiow,Cochinita Pibil was memorable & delicious !,2014-09-01 01:23:48,0
7,gxVQZJVeKBUk7jEhSyqv-A,5FIOXmUE3qMviX9GafGH-Q,Get a tsoynami for sure.,2010-01-30 02:03:16,0
8,2hdR7KYAmnCk2FjTnPFsuw,rcaPajgKOJC2vo_l3xa42A,Kelly is an awesome waitress there!,2012-05-29 02:05:56,0
9,DsWg3leomfasGs3j0rOfbQ,hfBrethLHS9iXeBNR8MtzQ,Check out the great assortment of organic & co...,2011-09-30 18:38:47,0


We can now write this cleaned DataFrame to a new CSV file in our Cloud Storage Bucket.

In [19]:
df_tip.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/tip.csv")

## Cleaning the Review Table

The reviews and users tables were the only ones which weren't previously converted to CSV files due to their size. They are stored as JSON files and we will have to read them into Spark as so.

In [4]:
df_review = spark.read.json("gs://ca4022_yelp_data/data_raw/yelp_academic_dataset_review.json")

In [5]:
df_review.limit(10).toPandas()

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,-MhfebM0QIsKt87iDN-FNw,0,2015-04-15 05:21:16,0,xQY8N_XvtGbearJ5X4QryQ,2.0,"As someone who has worked with many museums, I...",5,OwjRMXRC0KyPrIlcjaXeFQ
1,lbrU8StCq3yDfr-QMnGrmQ,0,2013-12-07 03:16:52,1,UmFMZ8PyXZTY2QcwzsfQYA,1.0,I am actually horrified this place is still in...,1,nIJD_7ZXHq-FX8byPMOkMQ
2,HQl28KMwrEKHqhFrrDqVNQ,0,2015-12-05 03:18:11,0,LG2ZaYiOgpr2DK_90pYjNw,5.0,I love Deagan's. I do. I really do. The atmosp...,1,V34qejxNsCbcgD8C0HVk-Q
3,5JxlZaqCnk1MnbgRirs40Q,0,2011-05-27 05:30:52,0,i6g_oA9Yf9Y31qt0wibXpw,1.0,"Dismal, lukewarm, defrosted-tasting ""TexMex"" g...",0,ofKDkJKXSKZXu5xJNGiiBQ
4,IS4cv902ykd8wj1TR0N3-A,0,2017-01-14 21:56:57,0,6TdNDKywdbjoTkizeMce8A,4.0,"Oh happy day, finally have a Canes near my cas...",0,UgMW8bLE0QMJDCkQ1Ax5Mg
5,nlxHRv1zXGT0c0K51q3jDg,0,2013-05-07 07:25:25,0,L2O_INwlrRuoX05KSjc4eg,5.0,This is definitely my favorite fast food sub s...,2,5vD2kmE25YBrbayKhykNxQ
6,Pthe4qk5xh4n-ef-9bvMSg,0,2015-11-05 23:11:05,0,ZayJ1zWyWgY9S_TRLT_y9Q,5.0,"Really good place with simple decor, amazing f...",1,aq_ZxGHiri48TUXJlpRkCQ
7,FNCJpSn0tL9iqoY3JC73qw,0,2017-07-18 18:31:54,0,lpFIJYpsvDxyph-kPzZ6aA,5.0,"Awesome office and staff, very professional an...",0,dsd-KNYKMpx6ma_sRWCSkQ
8,e_BiI4ej1CW1F0EyVLr-FQ,0,2015-02-16 06:48:47,0,JA-xnyHytKiOIHl_ztnK9Q,5.0,Most delicious authentic Italian I've had in t...,0,P6apihD4ASf1vpPxHODxAQ
9,Ws8V970-mQt2X9CwCuT5zw,1,2009-10-13 04:16:41,0,z4BCgTkfNtCu4XY5Lp97ww,4.0,I have been here twice. Very nice and laid bac...,3,jOERvhmK6_lo_XGUBPws_w


In [6]:
df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



We can see from the schema that most of the columns are the correct data type. The only column which needs to be converted is the date column.

In [7]:
# How many rows are in the business table?
df_review.count()

8021122

Firstly, we must test how many non-null values are present when we cast the data in the date column to a timestamp.

In [None]:
print("Date: ", df_review.filter(to_timestamp(df_review.date, 'yyyy-MM-dd HH:mm:ss').isNotNull()).count())

All of the rows convert properly so we simply need to cast this column and then write the DataFrame as before.

In [26]:
df_review = df_review.withColumn("date", to_timestamp(df_review.date, 'yyyy-MM-dd HH:mm:ss'))

In [27]:
df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [28]:
df_review.count()

8021122

In [29]:
# Writing the cleaned review DataFrame to our cloud storage bucket
df_review.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/review.csv")

## Cleaning the User Table

In [30]:
df_user = spark.read.json("gs://ca4022_yelp_data/data_raw/yelp_academic_dataset_user.json")

In [31]:
df_user.limit(10).toPandas()

Unnamed: 0,average_stars,compliment_cool,compliment_cute,compliment_funny,compliment_hot,compliment_list,compliment_more,compliment_note,compliment_photos,compliment_plain,...,cool,elite,fans,friends,funny,name,review_count,useful,user_id,yelping_since
0,3.57,22,0,22,3,1,2,11,0,15,...,227,,14,"oeMvJh94PiGQnx_6GlndPQ, wm1z1PaJKvHgSDRKfwhfDg...",225,Rafael,553,628,ntlvfPzc8eglqvk92iDIAw,2007-07-06 03:27:11
1,3.84,63,2,63,36,1,4,33,5,37,...,400,200820092010201120122013,27,"ly7EnE8leJmyqyePVYFlug, pRlR63iDytsnnniPb3AOug...",316,Michelle,564,790,FOBRPlBHa3WPHFB5qYDlVg,2008-04-28 01:29:25
2,3.44,17,1,17,9,0,6,3,1,7,...,103,2010,5,"Uwlk0txjQBPw_JhHsQnyeg, Ybxr1tSCkv3lYA0I1qmnPQ...",125,Martin,60,151,zZUnPeh2hEp0WydbAZEOOg,2008-08-28 23:40:05
3,3.08,7,0,7,2,0,1,7,0,14,...,84,2009,6,"iog3Nyg1i4jeumiTVG_BSA, M92xWY2Vr9w0xoH8bPplfQ...",160,John,206,233,QaELAmRcDc5TfJEylaaP8g,2008-09-20 00:08:14
4,4.37,31,1,31,8,1,9,22,31,28,...,512,200920102011201220142015201620172018,78,"3W3ZMSthojCUirKEqAwGNw, eTIbuu23j9tOgmIa9POyLQ...",400,Anne,485,1265,xvu8G900tezTzbbfqmTKvA,2008-08-09 00:30:27
5,2.88,32,7,32,24,2,16,60,9,36,...,155,2007,15,"E-fXXmALnZTPmzzL6rV4cQ, L2g6vS1GDfqLEpiDOFfCkA...",192,Steve,186,642,z5_82komKV3mI4ASGe2-FQ,2007-02-27 07:09:40
6,4.0,0,0,0,0,0,0,2,1,3,...,6,,0,"1pKOc55fFx09J5t2rPzTxw, HYpE14yYEKSgu3VNVzObHA...",4,Stuart,12,29,ttumcu6hWshk_EJVWrduDg,2010-05-12 16:30:08
7,3.63,623,35,623,483,24,81,193,172,541,...,2878,20112012201320142015201620172018,137,"c-Dja5bexzEWBufNsHfRrQ, 02HJNyOzzYXvEKVApJb8GQ...",2446,Jennifer,822,4127,f4_MRNHvN-yRn7EA8YWRxg,2011-01-17 00:18:23
8,3.75,0,0,0,0,0,3,3,1,4,...,34,,4,"YwaKGmRNnSa3R3N4Hf9jLw, v9YpDzYkJarRbzvVIY-63g...",21,Justin,14,68,UYACF30806j2mfbB5vdmJA,2007-07-24 23:55:21
9,4.1,37,5,37,39,3,5,18,5,24,...,426,20082009,31,"tnfVwTpuPlR6W2xEAMiqsg, Bd_16T8Dva27lF5LIrUmcQ...",372,Claire,218,587,QG13XBbgHWydzThRBGJtyw,2007-06-04 00:14:10


By quickly looking at a small sample of the data, we can see that it appears relatively clean but more analysis is needed to confirm this. Let's check the schema of the data...

In [32]:
df_user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



We can see from the schema that most of the columns are of the correct type and the only one which really needs to be converted is the *yelping_since* column as it is a timestamp.

In [33]:
# How many rows are in the business table?
df_user.count()

1968703

To determine if we can convert the data in the *yelping_since* column...

In [34]:
print("Yelping Since: ", df_user.filter(to_timestamp(df_user.yelping_since, 'yyyy-MM-dd HH:mm:ss').isNotNull()).count())

Yelping Since:  1968703


All of the rows convert correctly so we just need to cast this column and write to a CSV file.

In [35]:
df_user = df_user.withColumn("yelping_since", to_timestamp(df_user.yelping_since, 'yyyy-MM-dd HH:mm:ss'))

In [36]:
df_user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)



In [37]:
df_user.count()

1968703

In [38]:
# Writing the cleaned user DataFrame to our cloud storage bucket
df_user.write.option("header", "true").option("sep", ",").mode("overwrite").csv("gs://ca4022_yelp_data/data_clean/user.csv")