In [2]:
import findspark
findspark.init()
findspark.find()
import pyspark

In [4]:
from pyspark.sql import SparkSession

warehouseLocation='/user/hive/warehouse'

spark = SparkSession.builder \
.appName('EDA') \
.master('local') \
.config("spark.sql.warehouse.dir", warehouseLocation) \
.enableHiveSupport() \
.getOrCreate()

In [5]:
spark

In [6]:
review = spark.read.json('hdfs://0.0.0.0:19000/data/review.json')

In [9]:
review.dtypes

[('business_id', 'string'),
 ('cool', 'bigint'),
 ('date', 'string'),
 ('funny', 'bigint'),
 ('review_id', 'string'),
 ('stars', 'double'),
 ('text', 'string'),
 ('useful', 'bigint'),
 ('user_id', 'string')]

In [10]:
review.take(1)

[Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')]

In [22]:
review.createOrReplaceTempView("review")
spark.sql("select count(distinct business_id) as business, count(distinct review_id) as reviews, count(distinct user_id) as users, count(distinct date) as dates from review where business_id is null or review_id is null or user_id is null").collect()

[Row(business=0, reviews=0, users=0, dates=0)]

In [24]:
spark.sql("select distinct min(date), max(date) from review").collect()

[Row(min(date)='2004-10-12 10:13:32', max(date)='2018-11-14 18:13:26')]

In [26]:
spark.sql("select extract(year from date) as years, count(*) from review group by years order by years").collect()

[Row(years=2004, count(1)=13),
 Row(years=2005, count(1)=876),
 Row(years=2006, count(1)=5081),
 Row(years=2007, count(1)=21389),
 Row(years=2008, count(1)=57347),
 Row(years=2009, count(1)=101173),
 Row(years=2010, count(1)=187387),
 Row(years=2011, count(1)=302867),
 Row(years=2012, count(1)=367090),
 Row(years=2013, count(1)=491294),
 Row(years=2014, count(1)=704862),
 Row(years=2015, count(1)=952400),
 Row(years=2016, count(1)=1098786),
 Row(years=2017, count(1)=1217673),
 Row(years=2018, count(1)=1177662)]

In [27]:
spark.sql("select stars, count(*) from review group by stars order by stars").collect()

[Row(stars=1.0, count(1)=1002159),
 Row(stars=2.0, count(1)=542394),
 Row(stars=3.0, count(1)=739280),
 Row(stars=4.0, count(1)=1468985),
 Row(stars=5.0, count(1)=2933082)]

In [28]:
business = spark.read.json('hdfs://0.0.0.0:19000/data/business.json')

In [29]:
business.dtypes

[('address', 'string'),
 ('attributes',
  'struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>'),
 ('business_id', 'string'),
 ('categories', 'string'),
 ('city', 'string'),
 ('hours',
  'struct<Friday:st

In [30]:
business.take(1)

[Row(address='2818 E Camino Acequia Drive', attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, Ambience=None, BYOB=None, BYOBCorkage=None, BestNights=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, BusinessParking=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DietaryRestrictions=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids='False', GoodForMeal=None, HairSpecializesIn=None, HappyHour=None, HasTV=None, Music=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None), business_id='1SWheh84yJXfytovILXOAQ', categories='Golf, Active Life', city='Phoenix', hours=None, is_open=0, latitude=33.5221425, longit

In [34]:
business.createOrReplaceTempView("business")
spark.sql("select count(distinct business_id) as total_business, count(distinct categories) as categories, \
count(*) as total_table_rows from business").collect()

[Row(total_business=192609, categories=93385, total_table_rows=192609)]

In [35]:
spark.sql("select categories, count(*) counts from business group by categories order by counts desc limit 20").collect()

[Row(categories='Restaurants, Pizza', counts=1042),
 Row(categories='Nail Salons, Beauty & Spas', counts=1031),
 Row(categories='Pizza, Restaurants', counts=993),
 Row(categories='Beauty & Spas, Nail Salons', counts=947),
 Row(categories='Food, Coffee & Tea', counts=888),
 Row(categories='Mexican, Restaurants', counts=885),
 Row(categories='Coffee & Tea, Food', counts=865),
 Row(categories='Restaurants, Mexican', counts=853),
 Row(categories='Chinese, Restaurants', counts=840),
 Row(categories='Hair Salons, Beauty & Spas', counts=831),
 Row(categories='Beauty & Spas, Hair Salons', counts=819),
 Row(categories='Restaurants, Chinese', counts=789),
 Row(categories='Automotive, Auto Repair', counts=585),
 Row(categories='Auto Repair, Automotive', counts=534),
 Row(categories='Food, Grocery', counts=492),
 Row(categories='Grocery, Food', counts=491),
 Row(categories=None, counts=482),
 Row(categories='Restaurants, Italian', counts=474),
 Row(categories='Italian, Restaurants', counts=446),
 

In [39]:
spark.sql("select categories, count(*) counts from business \
where lower(categories) like '%restaurant%' \
group by categories order by counts desc limit 20").collect()

[Row(categories='Restaurants, Pizza', counts=1042),
 Row(categories='Pizza, Restaurants', counts=993),
 Row(categories='Mexican, Restaurants', counts=885),
 Row(categories='Restaurants, Mexican', counts=853),
 Row(categories='Chinese, Restaurants', counts=840),
 Row(categories='Restaurants, Chinese', counts=789),
 Row(categories='Restaurants, Italian', counts=474),
 Row(categories='Italian, Restaurants', counts=446),
 Row(categories='American (Traditional), Restaurants', counts=282),
 Row(categories='Restaurants, American (Traditional)', counts=272),
 Row(categories='Vietnamese, Restaurants', counts=272),
 Row(categories='Restaurants, Vietnamese', counts=264),
 Row(categories='Restaurants, Thai', counts=261),
 Row(categories='Restaurants', counts=256),
 Row(categories='Thai, Restaurants', counts=254),
 Row(categories='American (New), Restaurants', counts=240),
 Row(categories='Japanese, Restaurants', counts=235),
 Row(categories='Sandwiches, Restaurants', counts=227),
 Row(categories='

In [40]:
spark.sql("select count(distinct business_id) total_restaurant_businesses from business \
where lower(categories) like '%restaurant%' ").collect()

[Row(total_restaurant_businesses=59387)]

In [41]:
spark.sql("select business_id, review_count from business \
where lower(categories) like '%restaurant%' order by review_count desc limit 20").collect()

[Row(business_id='4JNXUYY8wbaaDmk3BPzlWw', review_count=8348),
 Row(business_id='RESDUcs7fIiihp38-d6_6g', review_count=8339),
 Row(business_id='K7lWdNUhCbcnEvI0NhGewg', review_count=6708),
 Row(business_id='f4x1YBxkLrZg652xt2KR5g', review_count=5763),
 Row(business_id='cYwJA2A6I12KNkm2rtXd5g', review_count=5484),
 Row(business_id='DkYS3arLOhA8si5uUEmHOw', review_count=5075),
 Row(business_id='2weQS-RnoOBhb1KsHKyoSQ', review_count=4400),
 Row(business_id='5LNZ67Yw9RD6nf4_UhXOjw', review_count=4322),
 Row(business_id='iCQpiavjjPzJ5_3gPD5Ebg', review_count=4286),
 Row(business_id='ujHiaprwCQ5ewziu0Vi9rw', review_count=4227),
 Row(business_id='AV6weBrZFFBfRGCbcRGO4g', review_count=4117),
 Row(business_id='KskYqH1Bi7Z_61pH6Om8pg', review_count=3998),
 Row(business_id='El4FC8jcawUVgw_0EIcbaQ', review_count=3944),
 Row(business_id='eoHdUeQDNgQ6WYEnP2aiRw', review_count=3929),
 Row(business_id='rcaPajgKOJC2vo_l3xa42A', review_count=3859),
 Row(business_id='faPVqws-x-5k2CQKDNtHxw', review_count

In [49]:
business_reviews=spark.sql("select r.business_id, r.review_id, r.date, r.useful, r.stars, b.city, b.state, \
b.latitude, b.longitude, b.name, b.postal_code \
from review r \
inner join (select b.business_id, b.city, b.state, b.latitude, b.longitude, b.name, b.postal_code from business b \
where lower(categories) like '%restaurant%' \
order by review_count desc limit 20) b \
on r.business_id=b.business_id")
                    

In [50]:
business_reviews.head(1)

[Row(business_id='ujHiaprwCQ5ewziu0Vi9rw', review_id='f0B9-r14-bLudyu5S7aLhw', date='2013-12-07 00:14:06', useful=1, stars=1.0, city='Las Vegas', state='NV', latitude=36.11322, longitude=-115.17689, name='The Buffet at Bellagio', postal_code='89109')]

In [51]:
business_reviews.count()

96793

In [54]:
business_reviews.write.csv('/data/business_reviews_restaurants.csv')