In [1]:
import findspark
findspark.init()

In [2]:
import os
import pyspark
from pyspark.sql import SparkSession, functions as F, types as T
import pandas as pd

spark = SparkSession.builder.appName('YELP').getOrCreate()

In [3]:
bus_df = spark.read.json(os.getcwd()+'/data/yelp_academic_dataset_business.json')
bus_df.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [4]:
# Exploring postal codes
bus_df.withColumn('code_len',F.length(F.col('postal_code'))) \
    .groupby('code_len').count().orderBy(F.col('code_len')).show()

+--------+------+
|code_len| count|
+--------+------+
|       0|   509|
|       3|  1779|
|       5|153583|
|       6|    12|
|       7| 53507|
|       8|     3|
+--------+------+



In [5]:
# Filtering Canadian cities
bus_df.filter(F.length(F.col('postal_code')).isin([3,6,7])) \
    .groupby('city').count().orderBy(F.col('count'),ascending=False).show()

+-------------+-----+
|         city|count|
+-------------+-----+
|      Toronto|20247|
|      Calgary| 8314|
|     Montréal| 6958|
|  Mississauga| 3510|
|      Markham| 1958|
|   North York| 1319|
|  Scarborough| 1244|
|Richmond Hill| 1151|
|     Brampton| 1141|
|      Vaughan| 1043|
|    Etobicoke|  841|
|        Laval|  472|
|    Thornhill|  434|
|    Newmarket|  411|
|     Oakville|  400|
|    Pickering|  369|
|         Ajax|  341|
|       Whitby|  322|
|       Aurora|  280|
|   Woodbridge|  264|
+-------------+-----+
only showing top 20 rows



In [6]:
# 10 Canadian cities with most number of businesses
cities = bus_df.filter(F.length(F.col('postal_code')).isin([3,6,7])) \
    .groupby('city').count().orderBy(F.col('count'),ascending=False)[['city']].limit(10)
cities.show()

+-------------+
|         city|
+-------------+
|      Toronto|
|      Calgary|
|     Montréal|
|  Mississauga|
|      Markham|
|   North York|
|  Scarborough|
|Richmond Hill|
|     Brampton|
|      Vaughan|
+-------------+



In [7]:
# Filter business data
print("Before filtering:", bus_df.count())
bus_filt = bus_df.join(cities,'city','inner')
print("After filtering:", bus_filt.count())
del bus_df

Before filtering: 209393
After filtering: 47110


In [8]:
rev_df = spark.read.json(os.getcwd()+'/data/yelp_academic_dataset_review.json')
rev_df.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [9]:
# Filter review data
print("Before filtering:", rev_df.count())
rev_filt = rev_df.join(bus_filt[['business_id']],'business_id','inner')
print("After filtering:", rev_filt.count())
del rev_df

Before filtering: 8021122
After filtering: 1124670


In [10]:
user_df = spark.read.json(os.getcwd()+'/data/yelp_academic_dataset_user.json')
user_df.columns

['average_stars',
 'compliment_cool',
 'compliment_cute',
 'compliment_funny',
 'compliment_hot',
 'compliment_list',
 'compliment_more',
 'compliment_note',
 'compliment_photos',
 'compliment_plain',
 'compliment_profile',
 'compliment_writer',
 'cool',
 'elite',
 'fans',
 'friends',
 'funny',
 'name',
 'review_count',
 'useful',
 'user_id',
 'yelping_since']

In [11]:
# Filter user data
print("Before filtering:", user_df.count())
user_filt = user_df.join(rev_filt[['user_id']],'user_id','inner')
print("After filtering:", user_filt.count())
del user_df

Before filtering: 1968703
After filtering: 1124670


In [12]:
tip_df = spark.read.json(os.getcwd()+'/data/yelp_academic_dataset_tip.json')
tip_df.columns

['business_id', 'compliment_count', 'date', 'text', 'user_id']

In [13]:
# Filter tip data
print("Before filtering:", tip_df.count())
tip_filt = tip_df.join(bus_filt[['business_id']],'business_id','inner')
print("After filtering:", tip_filt.count())
del tip_df

Before filtering: 1320761
After filtering: 155110


In [14]:
check_df = spark.read.json(os.getcwd()+'/data/yelp_academic_dataset_checkin.json')
check_df.columns

['business_id', 'date']

In [15]:
# Filter tip data
print("Before filtering:", check_df.count())
check_filt = check_df.join(bus_filt[['business_id']],'business_id','inner')
print("After filtering:", check_filt.count())
del check_df

Before filtering: 175187
After filtering: 40632


In [16]:
filt_datapath = os.getcwd()+'/filtered_data'
if not os.path.exists(filt_datapath):
    os.makedirs(filt_datapath)

bus_filt.write.parquet(filt_datapath+'/business',mode='overwrite')
rev_filt.write.parquet(filt_datapath+'/review',mode='overwrite')
user_filt.write.parquet(filt_datapath+'/user',mode='overwrite')
tip_filt.write.parquet(filt_datapath+'/tip',mode='overwrite')
check_filt.write.parquet(filt_datapath+'/checkin',mode='overwrite')