# Yelp Project

**Group 1**:
- SEO Yerim
- TIRUMALE LAKSHMANA RAO Kiran
- ZHENG Bin

Using the 6 datasets provided by Yelp, we as a team worked on building a prediction model to forecast which businesses will start doing delivery/takeout after the first lockdown in the North American region.

In order to help businesses sustain during this Covid-19 pandemic, it's important to undestand which key factors contributed to businesses sustaining themselves post the first wave of covid 19.

### The Project Pipeline:

- **Reading in the Data** - Understanding all 6 datasets provided by Covid
- **Dropping Unnecessary Columns** - Dropping features that wouldn't contribute to the model's performance, and also dropping complex features.
- **Joining all 6 datasets** - forming a basetable - Joining all datasets, after transforming them, in order to obtain 1 observation per business
- **Preprocessing the basetable** - Cleaning the basetable, handling missing values, pre-processing categorical features and text features for modeling
- **Modeling** - Modeling using 4 different algorithms using hyper parameter tuning, also finding the important features

# Reading in the Data

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

### Reading in the Yelp Data
https://www.yelp.com/dataset/documentation/main

Undertanding the all the datasets provided by covid, and doing performing an initial analysis

In [5]:
# Listing all the datasets available to us
import os
print(os.listdir("./data4"))

['data_exports', 'OneDrive_1_12-04-2021.zip', 'parsed_business_sample.json', 'parsed_checkin_sample.json', 'parsed_covid_sample.json', 'parsed_review_sample.json', 'parsed_tip_sample.json', 'parsed_user_sample.json']


### Business Dataset

In [6]:
# Business Data - Contains business data including location data, attributes, and categories.
business = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_business_sample.json")

# Viewing the data
print(business.columns)

# Re-checking the shape of the review user dataset loaded from json
print((business.count(), len(business.columns)))

['address', 'attributes.AcceptsInsurance', 'attributes.AgesAllowed', 'attributes.Alcohol', 'attributes.Ambience', 'attributes.BYOB', 'attributes.BYOBCorkage', 'attributes.BestNights', 'attributes.BikeParking', 'attributes.BusinessAcceptsBitcoin', 'attributes.BusinessAcceptsCreditCards', 'attributes.BusinessParking', 'attributes.ByAppointmentOnly', 'attributes.Caters', 'attributes.CoatCheck', 'attributes.Corkage', 'attributes.DietaryRestrictions', 'attributes.DogsAllowed', 'attributes.DriveThru', 'attributes.GoodForDancing', 'attributes.GoodForKids', 'attributes.GoodForMeal', 'attributes.HairSpecializesIn', 'attributes.HappyHour', 'attributes.HasTV', 'attributes.Music', 'attributes.NoiseLevel', 'attributes.Open24Hours', 'attributes.OutdoorSeating', 'attributes.RestaurantsAttire', 'attributes.RestaurantsCounterService', 'attributes.RestaurantsDelivery', 'attributes.RestaurantsGoodForGroups', 'attributes.RestaurantsPriceRange2', 'attributes.RestaurantsReservations', 'attributes.Restaurant

In [7]:
#replace . in the column name into _  and remove spaces in columns name

for name in business.schema.names:
  business = business.withColumnRenamed(name, name.replace('.', '_'))

In [8]:
# Checking the schema as well as confirming the renaming of all columns
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes_AcceptsInsurance: string (nullable = true)
 |-- attributes_AgesAllowed: string (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_Ambience: string (nullable = true)
 |-- attributes_BYOB: string (nullable = true)
 |-- attributes_BYOBCorkage: string (nullable = true)
 |-- attributes_BestNights: string (nullable = true)
 |-- attributes_BikeParking: string (nullable = true)
 |-- attributes_BusinessAcceptsBitcoin: string (nullable = true)
 |-- attributes_BusinessAcceptsCreditCards: string (nullable = true)
 |-- attributes_BusinessParking: string (nullable = true)
 |-- attributes_ByAppointmentOnly: string (nullable = true)
 |-- attributes_Caters: string (nullable = true)
 |-- attributes_CoatCheck: string (nullable = true)
 |-- attributes_Corkage: string (nullable = true)
 |-- attributes_DietaryRestrictions: string (nullable = true)
 |-- attributes_DogsAllowed: string (nullable = true)
 |-- attributes

In [9]:
# Checking the unique values of business_id
business.select('business_id').distinct().count()

19018

### Checkin Dataset

In [10]:
# Checkin Data - Checkins on a business.
checkin = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_checkin_sample.json")

# Viewing the data
print(checkin.columns)

# Re-checking the shape of the review user dataset loaded from json
print((checkin.count(), len(checkin.columns)))

['business_id', 'date']
(1990914, 2)


In [11]:
checkin.show(10, False)

+----------------------+--------------------+
|business_id           |date                |
+----------------------+--------------------+
|hihud--QRriCYZw1zZvW4g|2013-03-17 00:41:53 |
|hihud--QRriCYZw1zZvW4g| 2013-03-17 02:41:06|
|hihud--QRriCYZw1zZvW4g| 2013-03-18 06:26:54|
|hihud--QRriCYZw1zZvW4g| 2013-03-19 00:13:05|
|hihud--QRriCYZw1zZvW4g| 2013-03-23 04:27:56|
|hihud--QRriCYZw1zZvW4g| 2013-03-30 01:18:59|
|hihud--QRriCYZw1zZvW4g| 2013-04-02 04:57:07|
|hihud--QRriCYZw1zZvW4g| 2013-04-04 02:15:51|
|hihud--QRriCYZw1zZvW4g| 2013-04-04 08:44:58|
|hihud--QRriCYZw1zZvW4g| 2013-04-07 06:25:42|
+----------------------+--------------------+
only showing top 10 rows



In [12]:
# Checking the unique values of business_id
checkin.select('business_id').distinct().count()

16244

In [13]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

checkin=checkin.withColumn('date', regexp_replace('date', '&#39', ''))

checkin=checkin.select("business_id",to_timestamp(checkin.date, 'yyyy-MM-dd HH:mm:ss').alias('date'))

In [14]:
checkin.show(10, False)

+----------------------+-------------------+
|business_id           |date               |
+----------------------+-------------------+
|hihud--QRriCYZw1zZvW4g|2013-03-17 00:41:53|
|hihud--QRriCYZw1zZvW4g|2013-03-17 02:41:06|
|hihud--QRriCYZw1zZvW4g|2013-03-18 06:26:54|
|hihud--QRriCYZw1zZvW4g|2013-03-19 00:13:05|
|hihud--QRriCYZw1zZvW4g|2013-03-23 04:27:56|
|hihud--QRriCYZw1zZvW4g|2013-03-30 01:18:59|
|hihud--QRriCYZw1zZvW4g|2013-04-02 04:57:07|
|hihud--QRriCYZw1zZvW4g|2013-04-04 02:15:51|
|hihud--QRriCYZw1zZvW4g|2013-04-04 08:44:58|
|hihud--QRriCYZw1zZvW4g|2013-04-07 06:25:42|
+----------------------+-------------------+
only showing top 10 rows



### Review Dataset

In [15]:
# Review Data - Contains full review text data including the user_id that wrote the review and the business_id the review is written for.
review = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_review_sample.json")

# Viewing the data
print(review.columns)

# Re-checking the shape of the review user dataset loaded from json
print((review.count(), len(review.columns)))

['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id']
(500000, 9)


In [16]:
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [17]:
# Checking the unique values of business_id
review.select('business_id').distinct().count()

19018

In [18]:
# Checking the unique values of user_id
review.select('user_id').distinct().count()

305084

### Tip Dataset

In [19]:
# Tip Data - Tips written by a user on a business. Tips are shorter than reviews and tend to convey quick suggestions.
tip = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_tip_sample.json")

# Viewing the data
print(tip.columns)

# Re-checking the shape of the review user dataset loaded from json
print((tip.count(), len(tip.columns)))

['business_id', 'compliment_count', 'date', 'text', 'user_id']
(124161, 5)


In [20]:
tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [21]:
# Checking the unique values of business_id
tip.select('business_id').distinct().count()

12578

In [22]:
# Checking the unique values of business_id
tip.select('user_id').distinct().count()

68774

### User Dataset

In [23]:
# User Data - User data including the user's friend mapping and all the metadata associated with the user.
user = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_user_sample.json")

# Viewing the data
print(user.columns)

# Re-checking the shape of the review user dataset loaded from json
print((user.count(), len(user.columns)))

['average_stars', 'compliment_cool', 'compliment_cute', 'compliment_funny', 'compliment_hot', 'compliment_list', 'compliment_more', 'compliment_note', 'compliment_photos', 'compliment_plain', 'compliment_profile', 'compliment_writer', 'cool', 'elite', 'fans', 'friends', 'funny', 'name', 'review_count', 'useful', 'user_id', 'yelping_since']
(305084, 22)


In [24]:
user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [25]:
# Checking the unique values of user_id
user.select('user_id').distinct().count()

305084

### Covid Dataset

In [26]:
# Reading in the Covid Features Data
covid = spark.read.json("/FileStore/tables/Yelp_Data_Group/parsed_covid_sample.json")

# Viewing the data
print(covid.columns)

# Re-checking the shape of the review user dataset loaded from json
print((covid.count(), len(covid.columns)))

['Call To Action enabled', 'Covid Banner', 'Grubhub enabled', 'Request a Quote Enabled', 'Temporary Closed Until', 'Virtual Services Offered', 'business_id', 'delivery or takeout', 'highlights']
(19053, 9)


In [27]:
covid.printSchema()

root
 |-- Call To Action enabled: string (nullable = true)
 |-- Covid Banner: string (nullable = true)
 |-- Grubhub enabled: string (nullable = true)
 |-- Request a Quote Enabled: string (nullable = true)
 |-- Temporary Closed Until: string (nullable = true)
 |-- Virtual Services Offered: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- delivery or takeout: string (nullable = true)
 |-- highlights: string (nullable = true)



In [28]:
# Checking the unique values of business_id
covid.select('business_id').distinct().count()

19018

In [29]:
# Drop duplicates
covid = covid.dropDuplicates(["business_id"])

In [30]:
# Check distinct values of the column
covid.select('delivery or takeout').distinct().show(truncate = False)

### Get count of both null and missing values for the column 'delivery or takeout'
from pyspark.sql.functions import isnan, when, count, col
covid.select([count(when(isnan('delivery or takeout'),True))]).show()

+-------------------+
|delivery or takeout|
+-------------------+
|FALSE              |
|TRUE               |
+-------------------+

+---------------------------------------------------------+
|count(CASE WHEN isnan(delivery or takeout) THEN true END)|
+---------------------------------------------------------+
|                                                        0|
+---------------------------------------------------------+



In [31]:
# Checking the count of both false and trues in the column 'delivery or takeout'
covid.select('delivery or takeout').groupBy('delivery or takeout').count().show()

+-------------------+-----+
|delivery or takeout|count|
+-------------------+-----+
|              FALSE|12794|
|               TRUE| 6224|
+-------------------+-----+



In [32]:
# Checking for duplicate rows in the covid dataset
import pyspark.sql.functions as f
covid.groupBy(covid.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



# Dropping Unnecessary Columns

Before merging all above datasets into a single basetable, let us drop columns that we will not require for our machine learning algorithms, or in other words, columns that we feel will not contribute in improving the prediction capabilities of the algorithms

In [33]:
# List of columns of each dataset to be dropped

business_drop = ['address',  'name',  'attributes_Ambience', 'attributes_BYOBCorkage', 'attributes_BestNights', 'attributes_BusinessParking', 'attributes_DietaryRestrictions', 'attributes_GoodForMeal', 'attributes_HairSpecializesIn', 'attributes_Music', 'attributes_NoiseLevel', 'attributes_RestaurantsAttire', 'attributes_Smoking', 'attributes_WiFi', 'hours_Monday', 'hours_Tuesday', 'hours_Wednesday', 'hours_Thursday', 'hours_Friday', 'hours_Saturday', 'hours_Sunday', 'attributes_RestaurantsPriceRange2', 'latitude', 'longitude']
# 'city', 'categories', 'postal_code', 'state', 'attributes_AgesAllowed', 'attributes_Alcohol', 'latitude', 'longitude'

review_drop = ['date', 'review_id', 'text']

tip_drop = ['text', 'date']

user_drop = ['elite', 'friends', 'name']

covid_drop = ['highlights', 'Temporary Closed Until']
# 'Virtual Services Offered', 'Covid Banner'

In [34]:
# Dropping the columns from the dataframes
business = business.drop(*business_drop)
review = review.drop(*review_drop)
tip = tip.drop(*tip_drop)
user = user.drop(*user_drop)
covid = covid.drop(*covid_drop)

# Joining the tables to form the `basetable`

### Joining the `Review` and `User` Dataframes into `review_user`

In [35]:
# First renaming the `cool` and `funny`columns from the Review dataframe
review = review.withColumnRenamed("cool", "review_cool")
review = review.withColumnRenamed("funny", "review_funny")
review = review.withColumnRenamed("useful", "review_useful")

Step 1: Performing an inner joining between the 2 dataframes

In [36]:
# performing an inner join of the review and user dataframes
review_user = review.join(user,review.user_id ==  user.user_id,"inner").drop(user.user_id)

In [37]:
# checking the shape of the joined dataframe
print((review_user.count(), len(review_user.columns)))

(500000, 24)


Step 2: grouping by and aggregating the values by business_id

In [38]:
# Now we can drop the user_id column
review_user = review_user.drop("user_id")

In [39]:
review_user = review_user.groupBy("business_id").mean()

In [40]:
# checking the shape of the joined dataframe
print((review_user.count(), len(review_user.columns)))

(19018, 22)


In [41]:
# Checking the unique values of business_id
review_user.select('business_id').distinct().count()

19018

### Joining the `review_user` dataframe with `business` into `yelp_datamart`

In [42]:
# performing an inner join of the review_user and business_flatten dataframes
yelp_datamart = business.join(review_user, business.business_id ==  review_user.business_id,"inner").drop(review_user.business_id)

In [43]:
# checking the shape of the joined dataframe
print((yelp_datamart.count(), len(yelp_datamart.columns)))

(19018, 55)


In [44]:
# Checking the unique values of business_id
yelp_datamart.select('business_id').distinct().count()

19018

### Joining the `tip` dataframe to the `yelp_datamart`

Step 1: Processing the `tip` data, groupby business_id, and aggregating by sum

In [45]:
tip = tip.select("business_id", "compliment_count").groupBy("business_id").sum()

In [46]:
tip.select("business_id").distinct().count()

12578

In [47]:
# Renaming the column back to compliment_count
tip = tip.withColumnRenamed("sum(compliment_count)", "compliment_count")

Step 2: Performing a left join

Given that we have only 12578 business_id with compliment counts, we shall perform a left join

In [48]:
# performing an inner join of the review_user and business_flatten dataframes
yelp_datamart = yelp_datamart.join(tip, yelp_datamart.business_id ==  tip.business_id,"left").drop(tip.business_id)

In [49]:
# checking the shape of the joined dataframe
print((yelp_datamart.count(), len(yelp_datamart.columns)))

(19018, 56)


### Joining the `checkin` dataframe to the `yelp_datamart`

Step 1: Processing the `checkin` data, groupby `business_id`, and aggregating by `count`

In [50]:
checkin = checkin.groupBy("business_id").count()

In [51]:
# Renaming the column back to compliment_count
checkin = checkin.withColumnRenamed("count", "checkin_count")

In [52]:
# Checking number of unique business ids
checkin.select("business_id").distinct().count()

16244

Step 2: Performing a left join

Given that we have only 16244 business_id with checkin counts, we shall perform a left join

In [53]:
# performing an inner join of the review_user and business_flatten dataframes
yelp_datamart = yelp_datamart.join(checkin, yelp_datamart.business_id ==  checkin.business_id,"left").drop(checkin.business_id)

In [54]:
# checking the shape of the joined dataframe
print((yelp_datamart.count(), len(yelp_datamart.columns)))

(19018, 57)


### Joining the `covid` dataframe to the `yelp_datamart`

In [55]:
# performing an inner join of the yelp_datamart and covid dataframes
yelp_datamart = yelp_datamart.join(covid, yelp_datamart.business_id ==  covid.business_id,"inner").drop(covid.business_id)

In [56]:
# checking the shape of the joined dataframe
print((yelp_datamart.count(), len(yelp_datamart.columns)))

(19018, 63)


In [57]:
yelp_datamart.select('delivery or takeout').groupBy('delivery or takeout').count().show()

+-------------------+-----+
|delivery or takeout|count|
+-------------------+-----+
|              FALSE|12794|
|               TRUE| 6224|
+-------------------+-----+



In [59]:
basetable = yelp_datamart

# Pre-Processing the Basetable

### Checking for missing values

In [60]:
# checking the shape of the basetable
print((basetable.count(), len(basetable.columns)))

(19018, 63)


In [61]:
# Checking for missing values in the Basetable
from pyspark.sql.functions import when, count, col
basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           basetable.columns]).toPandas().T

Unnamed: 0,0
attributes_AcceptsInsurance,18318
attributes_AgesAllowed,19006
attributes_Alcohol,14163
attributes_BYOB,18977
attributes_BikeParking,10541
...,...
Covid Banner,0
Grubhub enabled,0
Request a Quote Enabled,0
Virtual Services Offered,0


### Dropping columns with over 80% missing values

80% missing values of a basetable of 19018 rows is 15214 missing value per column. 

Hence, if any column has more than 15200 missing values, we will are dropping that column!

In [62]:
# columns with over 15200 missing values:
missing_cols = ['attributes_AcceptsInsurance', 'attributes_AgesAllowed', 'attributes_BYOB', 'attributes_BusinessAcceptsBitcoin', 'attributes_Caters', 'attributes_CoatCheck', 'attributes_Corkage', 'attributes_DogsAllowed', 'attributes_DriveThru', 'attributes_GoodForDancing', 'attributes_HappyHour', 'attributes_Open24Hours', 'attributes_RestaurantsCounterService', 'attributes_RestaurantsTableService', 'attributes_WheelchairAccessible']

In [63]:
# Dropping all above columns that have over 80% missing values
basetable = basetable.drop(*missing_cols)

In [64]:
# Replacing all missing values with -1
basetable = basetable.na.fill(-1)

In [65]:
# checking the shape of the basetable
print((basetable.count(), len(basetable.columns)))

(19018, 48)


###  Converting boolean columns to integer values

We have a list of columns that should only contain boolean values (True (1) and False(0)). Let us pre-process first these columns, by converting all Trues to 1 and all Falses to 0, and any other values to -1 indicating a missing value

In [66]:
# List of boolean columns
cols=['attributes_BikeParking','attributes_BusinessAcceptsCreditCards','attributes_ByAppointmentOnly','attributes_GoodForKids','attributes_HasTV','attributes_OutdoorSeating','attributes_RestaurantsDelivery','attributes_RestaurantsGoodForGroups','attributes_RestaurantsReservations','attributes_RestaurantsTakeOut', 'Call To Action enabled','Grubhub enabled','Request a Quote Enabled','delivery or takeout']

In [67]:
for col_name in cols:
  basetable = basetable.withColumn(col_name, when(basetable[col_name] == "True", 1)
                                 .when(basetable[col_name] == "TRUE", 1)
                                 .when(basetable[col_name] == "False", 0)
                                 .when(basetable[col_name] == "FALSE", 0)
                                 .otherwise(-1))

In [68]:
# Confirming the above change, by checking the delivery or takeout column
basetable.select('delivery or takeout').groupBy('delivery or takeout').count().show()

+-------------------+-----+
|delivery or takeout|count|
+-------------------+-----+
|                  1| 6224|
|                  0|12794|
+-------------------+-----+



In [69]:
# Checking for missing values in the cleaned and processed Basetable
from pyspark.sql.functions import when, count, col
basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           basetable.columns]).toPandas().T

Unnamed: 0,0
attributes_Alcohol,14163
attributes_BikeParking,0
attributes_BusinessAcceptsCreditCards,0
attributes_ByAppointmentOnly,0
attributes_GoodForKids,0
attributes_HasTV,0
attributes_OutdoorSeating,0
attributes_RestaurantsDelivery,0
attributes_RestaurantsGoodForGroups,0
attributes_RestaurantsReservations,0


#### Handling the `Virtual Services Offered` column

In [70]:
# Checking the unique values of the virtual services offered column
basetable.select("Virtual Services Offered").distinct().show(truncate=False)

+-------------------------------+
|Virtual Services Offered       |
+-------------------------------+
|FALSE                          |
|offers_virtual_tasting_sessions|
|offers_virtual_experiences     |
|offers_virtual_tours           |
|offers_virtual_performances    |
|offers_virtual_classes         |
|offers_virtual_consultations   |
+-------------------------------+



Given that we have only False, and the rest of the values indicate a True, meaning virtual services were offered.
Let's replace all values other than False with True = 1

In [71]:
basetable = basetable.withColumn("Virtual Services Offered", when(basetable["Virtual Services Offered"] == "FALSE", 0)
                                 .when(basetable["Virtual Services Offered"] == "False", 0)
                                 .otherwise(1))

In [72]:
# Now confirming the above changes made to the virtual services offered column
basetable.select("Virtual Services Offered").distinct().show(truncate=False)

+------------------------+
|Virtual Services Offered|
+------------------------+
|1                       |
|0                       |
+------------------------+



### Converting the Target column to Double format

Converting the target column `delivery or takeout` to double format and renaming it to `label`

In [73]:
basetable = basetable.withColumn("delivery or takeout", basetable["delivery or takeout"].cast("double"))\
                     .withColumnRenamed("delivery or takeout","label")

### Processing the categorical columns

**STEP 1: The following categorical columns need to be processed using `StringIndexer`**

- `attributes_Alcohol`
- `city`
- `postal_code`
- `state`


In [74]:
# Checking for missing values for above 5 columns

cat_cols = ['attributes_Alcohol', 'city', 'postal_code', 'state']

from pyspark.sql.functions import when, count, col
basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           cat_cols]).toPandas().T

Unnamed: 0,0
attributes_Alcohol,14163
city,0
postal_code,0
state,0


In [75]:
# Replacing the missing vavlues for the above column with -1
basetable = basetable.fillna( {'attributes_Alcohol':-1 } )

In [76]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline


# attributes_Alcohol
alcoholIndxr = StringIndexer().setInputCol("attributes_Alcohol").setOutputCol("Alcohol_Ind")

# city
cityIndxr = StringIndexer().setInputCol("city").setOutputCol("city_Ind")

# postal_code
postal_codeIndxr = StringIndexer().setInputCol("postal_code").setOutputCol("postal_code_Ind")

# state
stateIndxr = StringIndexer().setInputCol("state").setOutputCol("state_Ind")


pipe_catv = Pipeline(stages=[alcoholIndxr, cityIndxr, postal_codeIndxr, stateIndxr])
basetable = pipe_catv.fit(basetable).transform(basetable)
basetable = basetable.drop("attributes_Alcohol", "city", "postal_code", "state" )

In [77]:
# Seeing the above changes
basetable.select('Alcohol_Ind', 'city_Ind', 'postal_code_Ind', 'state_Ind').show(3)

+-----------+--------+---------------+---------+
|Alcohol_Ind|city_Ind|postal_code_Ind|state_Ind|
+-----------+--------+---------------+---------+
|        0.0|     0.0|           74.0|      1.0|
|        0.0|    14.0|           28.0|      0.0|
|        0.0|     1.0|         3656.0|      2.0|
+-----------+--------+---------------+---------+
only showing top 3 rows



**STEP 2: Let us now onehot encode all boolean columns we cleaned earlier.**

Given that we have a lot of columns in this boolean method, let's apply the onehot encoding method using a loop

In [78]:
bool_cols=['attributes_BikeParking','attributes_BusinessAcceptsCreditCards','attributes_ByAppointmentOnly','attributes_GoodForKids','attributes_HasTV','attributes_OutdoorSeating','attributes_RestaurantsDelivery','attributes_RestaurantsGoodForGroups','attributes_RestaurantsReservations','attributes_RestaurantsTakeOut','is_open', 'Call To Action enabled','Grubhub enabled','Request a Quote Enabled','Virtual Services Offered']

In [79]:
# first applying the stringindexer, then onehotencoding in a loop for all columns from bool_cols
for my_col in bool_cols:
    my_col_ind = my_col + "Ind"
    my_col_dum = my_col + "_dum"
    model = StringIndexer().setInputCol(my_col).setOutputCol(my_col_ind)
    ohe = OneHotEncoder(inputCols=[my_col_ind],outputCols=[my_col_dum])
    pipe = Pipeline(stages=[model, ohe])
    basetable = pipe.fit(basetable).transform(basetable)
    basetable = basetable.drop(my_col, my_col_ind)

In [81]:
# Checking the above changes on the first 3 columns updated
basetable.select('attributes_BikeParking_dum','attributes_BusinessAcceptsCreditCards_dum','attributes_ByAppointmentOnly_dum').show(3)

+--------------------------+-----------------------------------------+--------------------------------+
|attributes_BikeParking_dum|attributes_BusinessAcceptsCreditCards_dum|attributes_ByAppointmentOnly_dum|
+--------------------------+-----------------------------------------+--------------------------------+
|             (2,[0],[1.0])|                            (2,[1],[1.0])|                   (2,[0],[1.0])|
|             (2,[0],[1.0])|                            (2,[0],[1.0])|                   (2,[0],[1.0])|
|             (2,[0],[1.0])|                            (2,[1],[1.0])|                   (2,[0],[1.0])|
+--------------------------+-----------------------------------------+--------------------------------+
only showing top 3 rows



### Processing the text column

We have only 2 text columns 
- `categories`
- `Covid Banner`

In [82]:
# Checking for missing values of above 2 columns
text_cols = ['categories', 'Covid Banner']

from pyspark.sql.functions import when, count, col
basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           text_cols]).toPandas().T

Unnamed: 0,0
categories,39
Covid Banner,0


In [83]:
# Replacing the missing vavlues for the above 2 columns with -1
basetable = basetable.fillna( { 'categories':-1 } )

In [84]:
from pyspark.ml.feature import HashingTF, Tokenizer, CountVectorizer
# Tokenizing
tok_categories = Tokenizer(inputCol="categories", outputCol="categories_words")
tok_covid_banner = Tokenizer(inputCol="Covid Banner", outputCol="covid_banner_words")

# Note We do not need to remove stop words in our case

# # Using Hashing Method
# hashingTF_categories = HashingTF(inputCol="categories_words", outputCol="categories_txt")
# hashingTF_covid_banner = HashingTF(inputCol="covid_banner_words", outputCol="covid_banner_txt")

# Using CountVectorizer method
cv_categories = CountVectorizer(inputCol="categories_words", outputCol="categories_txt")
cv_covid_banner = CountVectorizer(inputCol="covid_banner_words", outputCol="covid_banner_txt")

# Defining the pipeline
pipeline = Pipeline(stages=[tok_categories, cv_categories, tok_covid_banner, cv_covid_banner])

#Fit the pipeline to training documents.
basetable = pipeline.fit(basetable).transform(basetable)

# Dropping the columns
basetable = basetable.drop("categories", "categories_words", "Covid Banner", "covid_banner_words")

In [85]:
# confirming the above changes
basetable.select("categories_txt", "covid_banner_txt").show(3)

+--------------------+----------------+
|      categories_txt|covid_banner_txt|
+--------------------+----------------+
|(1777,[0,84,94,16...|(5795,[0],[1.0])|
|(1777,[0,1,4,14,2...|(5795,[0],[1.0])|
|(1777,[2,3,96,266...|(5795,[0],[1.0])|
+--------------------+----------------+
only showing top 3 rows



### Creating a backup of the final basetable

In [86]:
# Renaming few columns from the basetable to export to parquet file as backup
basetable = basetable.withColumnRenamed('avg(review_cool)','avg_review_cool')
basetable = basetable.withColumnRenamed('avg(review_funny)','avg_review_funny')
basetable = basetable.withColumnRenamed('avg(stars)','avg_stars')
basetable = basetable.withColumnRenamed('avg(review_useful)','avg_review_useful')
basetable = basetable.withColumnRenamed('avg(average_stars)','avg_average_stars')
basetable = basetable.withColumnRenamed('avg(compliment_cool)','avg_compliment_cool')
basetable = basetable.withColumnRenamed('avg(compliment_cute)','avg_compliment_cute')
basetable = basetable.withColumnRenamed('avg(compliment_funny)','avg_compliment_funny')
basetable = basetable.withColumnRenamed('avg(compliment_hot)','avg_compliment_hot')
basetable = basetable.withColumnRenamed('avg(compliment_list)','avg_compliment_list')
basetable = basetable.withColumnRenamed('avg(compliment_more)','avg_compliment_more')
basetable = basetable.withColumnRenamed('avg(compliment_note)','avg_compliment_note')
basetable = basetable.withColumnRenamed('avg(compliment_photos)','avg_compliment_photos')
basetable = basetable.withColumnRenamed('avg(compliment_plain)','avg_compliment_plain')
basetable = basetable.withColumnRenamed('avg(compliment_profile)','avg_compliment_profile')
basetable = basetable.withColumnRenamed('avg(compliment_writer)','avg_compliment_writer')
basetable = basetable.withColumnRenamed('avg(cool)','avg_cool')
basetable = basetable.withColumnRenamed('avg(fans)','avg_fans')
basetable = basetable.withColumnRenamed('avg(funny)','avg_funny')
basetable = basetable.withColumnRenamed('avg(review_count)','avg_review_count')
basetable = basetable.withColumnRenamed('avg(useful)','avg_useful')

In [87]:
basetable = basetable.withColumnRenamed('Call To Action enabled_dum','Call_To_Action_enabled_dum')
basetable = basetable.withColumnRenamed('Grubhub enabled_dum','Grubhub_enabled_dum')
basetable = basetable.withColumnRenamed('Request a Quote Enabled_dum','Request_a_Quote_Enabled_dum')
basetable = basetable.withColumnRenamed('Virtual Services Offered_dum','Virtual_Services_Offered_dum')

In [88]:
# Writing the yelp_datamart dataframe to csv
basetable.repartition(1).write.format("parquet")\
  .mode('overwrite')\
  .option("header", True)\
  .save("/FileStore/tables/Yelp_Data_Group/basetable_final_withOHE.parquet")

In [89]:
# Checking the shape of the final basetable
print((basetable.count(), len(basetable.columns)))


(19018, 48)


In [100]:
basetable_final = basetable

# Modeling

In [3]:
# Reading the backup file
# Reloading and checking
basetable_final = spark.read.format("parquet").load("/FileStore/tables/Yelp_Data_Group/basetable_final_withOHE.parquet")

In [4]:
# Checking the shape of the loaded basetable
print((basetable_final.count(), len(basetable_final.columns)))

(19018, 48)


### Deleting highly correlated columns

In [5]:
# dropping columns that are highly correlated and are "hacking" the AUC
basetable_final = basetable_final.drop('attributes_RestaurantsDelivery_dum', 'attributes_RestaurantsTakeOut_dum', 'Grubhub_enabled_dum')

### Basetable transformation for modeling

In [6]:
#Transform the tables in a table of label, features format using the RFormula method
from pyspark.ml.feature import RFormula

basetable_final = RFormula(formula="label ~ . - business_id").fit(basetable_final).transform(basetable_final)

print("basetable_final dataset no. of obs: " + str(basetable_final.count()))

basetable_final dataset no. of obs: 19018


### Creating a Train and Test set

In [7]:
#Create a train and test set with a 70% train, 30% test split
train, test = basetable_final.randomSplit([0.7, 0.3],seed=123)

print(train.count())
print(test.count())

13280
5738


In [8]:
# Selecting only the features and the label columns for modeling
train = train.select('features', 'label')
train.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(7621,[0,1,2,3,4,...|  0.0|
|(7621,[0,1,4,5,6,...|  0.0|
|(7621,[0,1,3,4,5,...|  0.0|
+--------------------+-----+
only showing top 3 rows



In [9]:
# Selecting only the features and the label columns for modeling
test = test.select('features', 'label')
test.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(7621,[0,1,4,6,7,...|  1.0|
|(7621,[0,1,2,3,4,...|  1.0|
|(7621,[0,1,2,3,4,...|  0.0|
+--------------------+-----+
only showing top 3 rows



## Modeling using Random Forest

In [11]:
#Hyperparameter tuning for pipeline logistic regression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

#Define pipeline
rf = RandomForestClassifier(maxBins=5000)
pipe_rf = Pipeline().setStages([rf])

#Set param grid
rf_params = ParamGridBuilder()\
  .addGrid(rf.numTrees, [150, 300, 500])\
  .build()

# Evaluator: uses the max(AUC) by default for final model
evaluator = BinaryClassificationEvaluator()

#Cross-validation of entire pipeline
cv_rf = CrossValidator()\
  .setEstimator(pipe_rf)\
  .setEstimatorParamMaps(rf_params)\
  .setEvaluator(evaluator)\
  .setNumFolds(10) # 10-fold cross validation

#Run cross-validation. Spark automatically saves the best solution as the main model.
cv_rf_model = cv_rf.fit(train)

In [12]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Performing the predictionon the test set and subsetting data to just prediction and label columns
preds = cv_rf_model.transform(test).select("prediction", "label")
preds.show(10)

#Get model performance on test set
out = preds.rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

print(metrics.areaUnderPR) # area under precision/recall curve
print(metrics.areaUnderROC) # area under Receiver Operating Characteristic curve

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 10 rows

0.8196477215125482
0.8352864757192904


In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.119728 


In [14]:
# ROC curve on the train data

display(cv_rf_model, train, "ROC")

CrossValidatorModel_724ee02eb965

DataFrame[features: vector, label: double]

'ROC'

In [15]:
# ROC curve on the test data

display(cv_rf_model, test, "ROC")

CrossValidatorModel_724ee02eb965

DataFrame[features: vector, label: double]

'ROC'

### Calculating Feature Importance using Random Forest

In [26]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(numTrees=20, maxDepth=30, labelCol="label", seed=42, maxBins=5000)
model = rf.fit(train)
predictions = model.transform(test)

In [None]:
#Function for extracing features 

def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [30]:
import pandas as pd
features_list = ExtractFeatureImp(model.featureImportances, predictions, rf.getFeaturesCol())

In [31]:
features_list.head(10)

Unnamed: 0,idx,name,vals,score
7599,27,postal_code_Ind,"[89109, 85251, 89119, 85260, 85281, 89102, 891...",0.074763
7609,37,attributes_HasTV_dum_-1,,0.068454
7614,42,attributes_RestaurantsGoodForGroups_dum_1,,0.060426
7607,35,attributes_GoodForKids_dum_-1,,0.048551
7610,38,attributes_HasTV_dum_1,,0.042574
7613,41,attributes_RestaurantsGoodForGroups_dum_-1,,0.041207
7615,43,attributes_RestaurantsReservations_dum_-1,,0.041174
27,51,categories_txt_2,,0.039653
28,52,categories_txt_3,,0.025071
7616,44,attributes_RestaurantsReservations_dum_0,,0.021813


## Modeling with Logistic Regression

In [32]:
#Hyperparameter tuning for pipeline logistic regression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

#Define pipeline
lr = LogisticRegression()
pipe_lr = Pipeline().setStages([lr])

#Set param grid
lr_params = ParamGridBuilder()\
  .addGrid(lr.regParam, [0.1, 0.01])\
  .addGrid(lr.maxIter, [50, 100,150])\
  .build()

# Evaluator: uses the max(AUC) by default for final model
evaluator = BinaryClassificationEvaluator()

#Cross-validation of entire pipeline
cv_lr = CrossValidator()\
  .setEstimator(pipe_lr)\
  .setEstimatorParamMaps(lr_params)\
  .setEvaluator(evaluator)\
  .setNumFolds(10) # 10-fold cross validation

#Run cross-validation. Spark automatically saves the best solution as the main model.
cv_lr_model = cv_lr.fit(train)

In [33]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Performing the predictionon the test set and subsetting data to just prediction and label columns
preds = cv_lr_model.transform(test).select("prediction", "label")
preds.show(10)

#Get model performance on test set
out = preds.rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

print(metrics.areaUnderPR) # area under precision/recall curve
print(metrics.areaUnderROC) # area under Receiver Operating Characteristic curve

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       0.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 10 rows

0.871934101798624
0.9245368545335323


In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0655281 


In [35]:
# ROC curve on the train data

display(cv_lr_model, train, "ROC")

CrossValidatorModel_e79270ff480b

DataFrame[features: vector, label: double]

'ROC'

In [36]:
# ROC curve on the test data

display(cv_lr_model, test, "ROC")

CrossValidatorModel_e79270ff480b

DataFrame[features: vector, label: double]

'ROC'

## Modeling with Decision Tree

In [38]:
#Hyperparameter tuning for pipeline Decision Tree
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier

#Define pipeline
dt = DecisionTreeClassifier(maxBins=5000)
pipe_dt = Pipeline().setStages([dt])

#Set param grid
dt_params = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [10, 20, 30])\
  .build()

# Evaluator: uses the max(AUC) by default for final model
evaluator = BinaryClassificationEvaluator()

#Cross-validation of entire pipeline
cv_dt = CrossValidator()\
  .setEstimator(pipe_dt)\
  .setEstimatorParamMaps(dt_params)\
  .setEvaluator(evaluator)\
  .setNumFolds(10) #10-fold cross validation

#Run cross-validation. Spark automatically saves the best solution as the main model.
cv_dt_model = cv_dt.fit(train)

In [39]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Performing the predictionon the test set and subsetting data to just prediction and label columns
preds = cv_dt_model.transform(test).select("prediction", "label")
preds.show(10)

#Get model performance on test set
out = preds.rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

print(metrics.areaUnderPR) # area under precision/recall curve
print(metrics.areaUnderROC) # area under Receiver Operating Characteristic curve

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  0.0|
|       1.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 10 rows

0.6452999518407262
0.8393633900578056


In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.181945 


In [41]:
# ROC curve on the train data

display(cv_dt_model, train, "ROC")

CrossValidatorModel_c42dba089b5d

DataFrame[features: vector, label: double]

'ROC'

In [42]:
# ROC curve on the test data

display(cv_dt_model, test, "ROC")

CrossValidatorModel_c42dba089b5d

DataFrame[features: vector, label: double]

'ROC'

## Modeling with Gradient Boosting

In [16]:
#Hyperparameter tuning for pipeline Decision Tree
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier

#Define pipeline
gbt = GBTClassifier(maxBins=5000)
pipe_gbt = Pipeline().setStages([gbt])

#Set param grid
gbt_params = ParamGridBuilder()\
             .build()

# gbt_params = ParamGridBuilder()\
#              .addGrid(gbt.maxDepth, [10, 20])\
#              .addGrid(gbt.maxIter, [50, 100])\
#              .build()


# Evaluator: uses the max(AUC) by default for final model
evaluator = BinaryClassificationEvaluator()

#Cross-validation of entire pipeline
cv_gbt = CrossValidator()\
  .setEstimator(pipe_gbt)\
  .setEstimatorParamMaps(gbt_params)\
  .setEvaluator(evaluator)\
  .setNumFolds(10) #10-fold cross validation



#Run cross-validation. Spark automatically saves the best solution as the main model.
cv_gbt_model = cv_gbt.fit(train)

In [17]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Performing the predictionon the test set and subsetting data to just prediction and label columns
preds = cv_gbt_model.transform(test).select("prediction", "label")
preds.show(10)

#Get model performance on test set
out = preds.rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

print(metrics.areaUnderPR) # area under precision/recall curve
print(metrics.areaUnderROC) # area under Receiver Operating Characteristic curve

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 10 rows

0.646784754500806
0.8464686044595405


In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.180028 


In [19]:
# ROC curve on the train data

display(cv_gbt_model, train, "ROC")

CrossValidatorModel_2203520bd7b9

DataFrame[features: vector, label: double]

'ROC'

In [20]:
# ROC curve on the test data

display(cv_gbt_model, test, "ROC")

CrossValidatorModel_2203520bd7b9

DataFrame[features: vector, label: double]

'ROC'