# Yelp Dataset Challenge

## The average # of reviews and the average # of stars grouped by city and business category.




 We first import the dataframe and select the relevant columns using the select function.
Since each row has a group of categories we need to explode the categories first so that each row gets a single category, by which we later group. 

In [1]:
import pyspark as spark
sc = spark.SparkContext()
sql = spark.SQLContext(sc)
import pyspark.sql.functions as func



businessdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/business.json')
businessdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/business.json')
businessdf = businessdf.select('name','business_id','city','state','categories','stars','review_count')
businessdf = businessdf.withColumn("categories",func.explode(businessdf.categories))
result1 = businessdf.groupBy('city',"categories").agg(func.avg("stars") , func.avg("review_count"))
result1.show(10)
result1.repartition(1).write.csv("Q1.csv")
#this will write df to single csv instead of writing diff csv acc to partitions 

+----------------+------------------+------------------+------------------+
|            city|        categories|        avg(stars)| avg(review_count)|
+----------------+------------------+------------------+------------------+
|Richmond Heights|          Shopping|               3.5|              10.5|
|         Madison|  Laundry Services| 2.962962962962963| 6.851851851851852|
|          Elyria|           Doctors|               2.5|               3.0|
|            Mesa|Auto Customization|              4.25|12.857142857142858|
|         Phoenix|              Pets| 4.073369565217392| 21.04076086956522|
|         Toronto|Financial Services|           2.96875|          6.296875|
|           Tempe|   Hotels & Travel| 3.258771929824561| 29.19298245614035|
|        Surprise|          Shopping|3.5719424460431655|12.532374100719425|
|       Henderson|             Pizza|3.3532608695652173|  96.3804347826087|
|       Etobicoke|    Sporting Goods|             3.625| 6.583333333333333|
+-----------

###  Pivoting the business categories as columns, I show the average # stars for each category, by (city,state):

First we use the pivot function to transform the categories from row to columns.Then in the next step we group by
city,state. Since since too many columns to show, i have shown the resulting dataframe using few selected categries like 'food', 'chinese' and 'restaurants'

In [2]:
#Using businessdf from previous part
result2 = businessdf.groupBy('city','state').pivot('categories').avg('stars')
result2_sel_cols = result2.select('city','state','Food','Chinese','Restaurants')
result2_sel_cols.show(10)
result2.repartition(1).write.csv("Q2.csv")#this will write df to single csv instead of writing diff csv acc to partitions 

+--------------------+-----+------------------+------------------+------------------+
|                city|state|              Food|           Chinese|       Restaurants|
+--------------------+-----+------------------+------------------+------------------+
|                Mesa|   AZ| 3.551829268292683|3.2349397590361444|3.3077994428969357|
|St-Benoît de Mirabel|   QC|               4.0|              null|               4.0|
|      Cote-Saint-Luc|   QC|               5.0|              null|              3.75|
|  Mont-Saint-Hilaire|   QC|               3.0|              null|3.5714285714285716|
|         Monreoville|   PA|              null|              null|              null|
|            Citibank|   NV|              null|              null|              null|
|          Canonsburd|   PA|              null|              null|               1.0|
|    Centennial Hills|   NV|              null|              null|              null|
|         Scarborough|   ON|3.4651898734177213|3.28389

##  The average rank (# stars) for businesses that are ‘Mexican’ category, AND offer takeout: (e.g. "attributes": {"RestaurantsTakeOut": true,…})

First we import the business json and select the categories column and extract the ''attributes.RestaurantsTakeOut' as a column as well. Finally we filter the datafram so that it only has Mexican restaurants that offer takeout. Finally we find the average rank of these restaurants.

In [3]:
businessdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/business.json')
businessdf = businessdf.withColumn("categories",func.explode(businessdf.categories))
businessdf = businessdf.select("name","business_id",'city','state','categories',\
                               'attributes.RestaurantsTakeOut','stars','review_count')
businessdf = businessdf.filter((businessdf.categories == 'Mexican') & (businessdf.RestaurantsTakeOut == True)) 

In [4]:
businessdf.show(10)

+--------------------+--------------------+-----------+-----+----------+------------------+-----+------------+
|                name|         business_id|       city|state|categories|RestaurantsTakeOut|stars|review_count|
+--------------------+--------------------+-----------+-----+----------+------------------+-----+------------+
|             Rocky's|HmI9nhgOkrXlUr6KZ...| Pittsburgh|   PA|   Mexican|              true|  3.0|          15|
|       El Pollo Loco|LDMCrFlGIFUN6L-FE...|  Las Vegas|   NV|   Mexican|              true|  3.0|          12|
|Don Ruben's Mexic...|wsyNO9Ac4gqGYTBfN...|   Glendale|   AZ|   Mexican|              true|  4.5|         186|
|Mariscos Playa Es...|YTqtM2WFhcMZGeAGA...|  Las Vegas|   NV|   Mexican|              true|  4.5|         330|
|       Baja Miguel's|Oto60yDwk1z72WmfW...|  Las Vegas|   NV|   Mexican|              true|  3.0|         175|
|         Gecko Grill|aP2Ma-Wx2lydppntC...|    Gilbert|   AZ|   Mexican|              true|  3.5|         201|
|

In [5]:
average_rank = businessdf.agg(func.avg('stars'))
average_rank.show()

+-----------------+
|       avg(stars)|
+-----------------+
|3.436754507628294|
+-----------------+



##  For businesses within 15km of Toronto center, I show the average # stars and average # reviews by type of business category


Center: Toronto, CA
Latitude: 43.6532° N, 79.3832° W
The bounding circle for this problem is a ~15 km radius. A business falls in the region if it’s coordinates are within the circle.

We first need to import certain sql functions.For this question we also need to select the latitude and longitude, so that we can calculate the distance from Toronto using the co-rdinates by defining a distance UDF(user-defined function).  Then we filter using the distance functions to find only those businesses within 15 km of Toronto. Then we explode categories again since we need to group by categories. After this we groupby by category and find the average 
stars and review_count.

In [6]:
import numpy as np
from pyspark.sql.functions import udf,col,asc,desc
from pyspark.sql.types import DoubleType,StringType,BooleanType
import math

In [7]:
businessdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/business.json')
businessdf = businessdf.select('name','business_id','city','state','categories',\
                               'latitude','longitude','stars','review_count')
businessdf = businessdf.dropna(subset=['latitude','longitude'])


def dist(lat2,lon2):
    lat2 = math.radians(lat2)
    lon2 = math.radians(lon2)
    lat1 = math.radians(43.6532)
    lon1 = math.radians(-79.3832)
    R = 6371
    return R*math.acos(math.sin(lat1)*math.sin(lat2) + math.cos(lat1)*math.cos(lat2)*math.cos(lon1 - lon2))
    

dist_udf = udf(dist,DoubleType())
businessdf = businessdf.filter(dist_udf(businessdf.latitude,businessdf.longitude) <=15)
businessdf = businessdf.withColumn("categories",func.explode(businessdf.categories))
businessdf = businessdf.groupBy("categories").agg(func.avg("stars"), func.avg("review_count"))
businessdf.show(10)
businessdf.repartition(1).write.csv("Q4.csv")#this will write df to single csv instead of writing diff csv acc to partitions 

+----------------+------------------+------------------+
|      categories|        avg(stars)| avg(review_count)|
+----------------+------------------+------------------+
|  Dermatologists|3.2142857142857144| 9.285714285714286|
|Historical Tours|              4.25|               8.0|
|         Beaches| 4.208333333333333|             22.75|
|   Skating Rinks|3.9444444444444446|              10.0|
|   Data Recovery| 4.583333333333333|              14.5|
|          Fondue|               3.5|              35.0|
|     Boat Repair|               5.0|               6.0|
|   Videographers| 4.333333333333333|               4.0|
|    Contract Law|               4.0|              13.0|
|        Day Spas|3.5242537313432836|14.951492537313433|
+----------------+------------------+------------------+
only showing top 10 rows



## For the top 10 and bottom 10 food businesses near Toronto (ranked by stars), I summarize star rating for reviews in January through May.

First we need to recreate the Dataframe which has businesses near Toronto.Also we need to import the review json, which has information about when the reviews were posted

In [8]:
businessdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/business.json')
reviewdf = sql.read.json('/Users/rohitsuvarna/Downloads/dataset/review.json')
businessdf = businessdf.select('name','business_id','city','state','categories','latitude','longitude','stars','review_count')
businessdf = businessdf.dropna(subset=['latitude','longitude'])
businessdf = businessdf.filter(dist_udf(businessdf.latitude,businessdf.longitude) <=15)

Then we explode the categories to select only the 'Food' businesses near Toronto as mentioned in the question.

In [9]:
businessdf = businessdf.withColumn("categories",func.explode(businessdf.categories))
businessdf = businessdf.filter(businessdf.categories == 'Food')
businessdf.show(10)

+--------------------+--------------------+----------+-----+----------+-------------+--------------+-----+------------+
|                name|         business_id|      city|state|categories|     latitude|     longitude|stars|review_count|
+--------------------+--------------------+----------+-----+----------+-------------+--------------+-----+------------+
|    The Tea Emporium|v2WhjAB3PIBA8J8Vx...|   Toronto|   ON|      Food|   43.6771258|   -79.3532848|  4.5|           7|
|Paris Bakery & Pa...|kEq7eudoX5qdcaSLA...|   Toronto|   ON|      Food|   43.6624007|    -79.444706|  3.0|          16|
|Fortinos Supermarket|UZShf6G75npKCCjiH...|North York|   ON|      Food|   43.7166569|   -79.4472576|  3.5|           9|
|     Boardwalk Place|Z1r6b30Tg0n0ME4-Z...|   Toronto|   ON|      Food|   43.6630096|   -79.3108978|  3.0|          13|
|      Sangria Lounge|v86J4q6ATA2ANm1fc...|   Toronto|   ON|      Food|43.6435365586|-79.4479535911|  3.5|          23|
|          Second Cup|zcWit_aSGR5wiunYB.

Then I join the business and review df on date, and then use another UDF to filter the dates between Jan and March

In [10]:
Join_df = businessdf.join(reviewdf, businessdf.business_id == reviewdf.business_id).select(businessdf.name,businessdf.business_id,businessdf.city,businessdf.state,reviewdf.stars,reviewdf.date)
Join_df.show(10)


+-------------+--------------------+---------+-----+-----+----------+
|         name|         business_id|     city|state|stars|      date|
+-------------+--------------------+---------+-----+-----+----------+
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    1|2016-07-14|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2012-08-11|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    1|2014-05-14|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2017-04-24|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    2|2015-02-11|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    4|2017-06-29|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2016-05-26|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    3|2015-02-26|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    4|2016-09-08|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    4|2014-02-14|
+-------------+--------------------+---------+-----+-----+----------+
only showing top 10 

In [11]:
def Jan_thru_march(input_stri):
    accept_months = ['01','02','03','04','05']
    return input_stri[5:7] in accept_months

Date_udf = udf(Jan_thru_march,StringType())
Join_df = Join_df.filter(Date_udf(Join_df.date) == True)
Join_df.show(10)

+-------------+--------------------+---------+-----+-----+----------+
|         name|         business_id|     city|state|stars|      date|
+-------------+--------------------+---------+-----+-----+----------+
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    1|2014-05-14|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2017-04-24|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    2|2015-02-11|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2016-05-26|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    3|2015-02-26|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    4|2014-02-14|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2011-04-12|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2015-05-23|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    4|2013-05-09|
|Select Bakery|09OYbFNrS1n8u5gE6...|East York|   ON|    5|2013-02-06|
+-------------+--------------------+---------+-----+-----+----------+
only showing top 10 

Then I find the average star ratings for all the businesses considering ratings only between Jan and March

In [12]:
Avg_df = Join_df.groupBy('name').avg('stars')
Avg_df.show(10)

+------------------+------------------+
|              name|        avg(stars)|
+------------------+------------------+
|The Fresh Tea Shop|3.7058823529411766|
|   Golden Bakeshop|               5.0|
| Fahrenheit Coffee| 4.587155963302752|
|     Sugar & Spice|3.3333333333333335|
|     Umi's Kitchen|               4.5|
|  House of Moments| 3.272727272727273|
|           Penny's|               4.0|
|      Oxford Fruit|               4.0|
|             Corks|              4.25|
|    The Cannonball|3.3529411764705883|
+------------------+------------------+
only showing top 10 rows



After this I use 'order by' to find the top 10 businesses and then the bottom 10 businesses and finally I use Union to combine both the dataframes

In [13]:
Best_df = Avg_df.orderBy(desc('avg(stars)'),asc('name'))
Best_df = Best_df.limit(10)
Worst_df = Avg_df.orderBy('avg(stars)',asc('name'))
Worst_df = Worst_df.limit(10)
Combined_df = Best_df.union(Worst_df)
Combined_df.show()
Combined_df.repartition(1).write.csv("Q5.csv")#this will write df to single csv instead of writing diff csv acc to partitions 

+--------------------+----------+
|                name|avg(stars)|
+--------------------+----------+
|2nd Nature Bakery...|       5.0|
|        Academy Cafe|       5.0|
|Alimentari Italia...|       5.0|
|Allen's Scottish ...|       5.0|
|          Amico Chef|       5.0|
|Ararat Specialty ...|       5.0|
|     Aren't We Sweet|       5.0|
|      Avenue Seafood|       5.0|
|        Azienda Cafe|       5.0|
|   Babycake Cupcakes|       5.0|
|  Alfredos Fine Food|       1.0|
|    Amato King Pizza|       1.0|
|         Baroli Cafe|       1.0|
|          Blue Goose|       1.0|
|        Bonga Buldak|       1.0|
|        Castle Fruit|       1.0|
|     Chocolate Charm|       1.0|
|         Coffeeholic|       1.0|
| Country Style Donut|       1.0|
|               Doria|       1.0|
+--------------------+----------+

