# Business File 

In [2]:
sc

<pyspark.context.SparkContext at 0x922c9e8>

In [3]:
rdd = sc.textFile("yelp_business.csv", use_unicode=False)

In [4]:
rdd.take(2)

['business_id,name,neighborhood,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,categories',
 'FYWN1wneV18bWNgQjJ2GNg,"""Dental by Design""",,"""4855 E Warner Rd, Ste B9""",Ahwatukee,AZ,85044,33.3306902,-111.9785992,4.0,22,1,Dentists;General Dentistry;Health & Medical;Oral Surgeons;Cosmetic Dentists;Orthodontists']

In [5]:
import operator

def extractCities(partId,records):
    if partId==0:
        records.next()
    import csv
    reader = csv.reader(records)
    for row in reader:
        yield (row[4]) #Cities #Put comma to explicity define a tuble of size one
        
cities = rdd.mapPartitionsWithIndex(extractCities)
cities = cities.map(lambda city: (city,1)).reduceByKey(lambda x,y: x+y)
cities = cities.toDF()

#Rename
cities = cities.select(cities['_1'].alias('City'),
                       cities['_2'].alias('Number_of_Restaurants'))
cities = cities.sort('Number_of_Restaurants',ascending = False)

total = cities.select("Number_of_Restaurants").agg({"Number_of_Restaurants": "sum"}).collect().pop()['sum(Number_of_Restaurants)']
print(total)
cities = cities.withColumn('Percentage', (cities['Number_of_Restaurants']/total) * 100)
cities.show(20)

174567
+-----------+---------------------+------------------+
|       City|Number_of_Restaurants|        Percentage|
+-----------+---------------------+------------------+
|  Las Vegas|                26775|15.337950471738646|
|    Phoenix|                17213| 9.860397440524268|
|    Toronto|                17206|  9.85638751883231|
|  Charlotte|                 8553| 4.899551461616457|
| Scottsdale|                 8228| 4.713376525918416|
| Pittsburgh|                 6355|3.6404360503417026|
|       Mesa|                 5760| 3.299592706525288|
|  MontrÃ©al|                 5709|3.2703775627695957|
|  Henderson|                 4465|2.5577571935130923|
|      Tempe|                 4263|2.4420423104023095|
|   Chandler|                 3994|2.2879467482399307|
|  Edinburgh|                 3868|  2.21576815778469|
|  Cleveland|                 3322|1.9029942658119807|
|    Madison|                 3213|1.8405540566086374|
|   Glendale|                 3206|1.8365441349166796|
|  

In [6]:
cities.count()

1094

In [7]:
#Percentage of bussiness with stars 
def extractStars(partId,records):
    if partId==0:
        records.next()
    import csv
    reader = csv.reader(records)
    for row in reader:
        yield (row[9]) #Cities #Put comma to explicity define a tuble of size one
stars = rdd.mapPartitionsWithIndex(extractStars)
stars = stars.map(lambda star: (star,1)).reduceByKey(lambda x,y: x+y)
stars = stars.toDF()

stars = stars.select(stars['_1'].alias('Stars'),
                       stars['_2'].alias('Number_of_Stars'))
stars = stars.sort('Number_of_Stars',ascending = False)
stars.show()

+-----+---------------+
|Stars|Number_of_Stars|
+-----+---------------+
|  4.0|          33492|
|  3.5|          32038|
|  5.0|          27540|
|  4.5|          24796|
|  3.0|          23142|
|  2.5|          16148|
|  2.0|           9320|
|  1.5|           4303|
|  1.0|           3788|
+-----+---------------+



# Reviews and Dates

In [8]:
def extractRow(index, lines):
    import csv
    if index == 0:
        lines.next()
        
    reader = csv.reader(lines)
    for row in reader:
        if len(row) == 9:
            yield row

In [9]:
rdd = sc.textFile("yelp_review.csv", use_unicode=False)
rdd = rdd.mapPartitionsWithIndex(extractRow)
rdd = rdd.map(lambda x: [x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8]])

In [10]:
dates_rdd = rdd.map(lambda row: (row[4],1))

from operator import add
dates_rdd = dates_rdd.reduceByKey(add).cache()

dates_rdd.take(2)

[('2011-08-24', 979), ('2014-01-11', 1664)]

In [11]:
#FILTER VALENTINE DAYS ONLY

valentines_dates = []
for year in range(4,18):
    valentines_dates.append('20' + ("0"+str(year) if year < 10 else str(year)) + '-02-14')
    
valentines_rdd = dates_rdd.filter(lambda row: row[0] in valentines_dates)
valentines_rdd.take(12) #WORKS

[('2010-02-14', 379),
 ('2006-02-14', 9),
 ('2008-02-14', 84),
 ('2014-02-14', 1381),
 ('2007-02-14', 51),
 ('2013-02-14', 1147),
 ('2016-02-14', 3448),
 ('2015-02-14', 2346),
 ('2012-02-14', 883),
 ('2017-02-14', 2506),
 ('2011-02-14', 805),
 ('2009-02-14', 152)]

In [12]:
#GET ALL REVIEWS IN FEBRUARY
feb_days = []
for year in range(4,18):
    for day in range(1,30):
        date = '20' + ("0"+str(year) if year < 10 else str(year)) + '-02-' + ("0"+str(day) if day < 10 else str(day))
        feb_days.append(date)
        
feb_rdd = dates_rdd.filter(lambda row: row[0] in feb_days)
feb_rdd.take(5)

[('2015-02-06', 1944),
 ('2017-02-28', 2876),
 ('2016-02-20', 3257),
 ('2015-02-07', 2307),
 ('2010-02-18', 427)]

In [13]:
#Show all reviews for year 2016 in February
feb2016 = []
for day in range(1,30):
        date = '2016' + '-02-' + ("0"+str(day) if day < 10 else str(day))
        feb2016.append(date)
feb2016_rdd = feb_rdd.filter(lambda row: row[0] in feb2016)
feb2016_rdd.take(30)

[('2016-02-20', 3257),
 ('2016-02-27', 3121),
 ('2016-02-26', 2740),
 ('2016-02-25', 2705),
 ('2016-02-24', 2688),
 ('2016-02-29', 2959),
 ('2016-02-28', 3385),
 ('2016-02-18', 2797),
 ('2016-02-19', 2731),
 ('2016-02-12', 2619),
 ('2016-02-13', 3164),
 ('2016-02-10', 2681),
 ('2016-02-11', 2548),
 ('2016-02-16', 3155),
 ('2016-02-17', 2984),
 ('2016-02-14', 3448),
 ('2016-02-15', 3695),
 ('2016-02-09', 2490),
 ('2016-02-08', 2475),
 ('2016-02-01', 3037),
 ('2016-02-03', 2663),
 ('2016-02-02', 2577),
 ('2016-02-05', 2558),
 ('2016-02-23', 2705),
 ('2016-02-04', 2577),
 ('2016-02-22', 3180),
 ('2016-02-07', 3035),
 ('2016-02-21', 3450),
 ('2016-02-06', 3001)]

In [14]:
# Create the schema necessary for the creation of a DataFrame
from pyspark.sql.types import *
schema = StructType([
    StructField("Date", StringType(), False), 
    StructField("Number_of_Reviews", StringType(), False)])

In [15]:
feb2016_df = sqlContext.createDataFrame(feb2016_rdd, schema)

feb2016_df = feb2016_df.select(feb2016_df['Date'].cast(DateType()), 
                              feb2016_df['Number_of_Reviews'].cast(IntegerType())).cache()
feb2016_df.show(2)

+----------+-----------------+
|      Date|Number_of_Reviews|
+----------+-----------------+
|2016-02-20|             3257|
|2016-02-27|             3121|
+----------+-----------------+
only showing top 2 rows



In [16]:
#Sort february days
feb2016_df = feb2016_df.orderBy('Date')
feb2016_df.show(30)

+----------+-----------------+
|      Date|Number_of_Reviews|
+----------+-----------------+
|2016-02-01|             3037|
|2016-02-02|             2577|
|2016-02-03|             2663|
|2016-02-04|             2577|
|2016-02-05|             2558|
|2016-02-06|             3001|
|2016-02-07|             3035|
|2016-02-08|             2475|
|2016-02-09|             2490|
|2016-02-10|             2681|
|2016-02-11|             2548|
|2016-02-12|             2619|
|2016-02-13|             3164|
|2016-02-14|             3448|
|2016-02-15|             3695|
|2016-02-16|             3155|
|2016-02-17|             2984|
|2016-02-18|             2797|
|2016-02-19|             2731|
|2016-02-20|             3257|
|2016-02-21|             3450|
|2016-02-22|             3180|
|2016-02-23|             2705|
|2016-02-24|             2688|
|2016-02-25|             2705|
|2016-02-26|             2740|
|2016-02-27|             3121|
|2016-02-28|             3385|
|2016-02-29|             2959|
+-------

In [None]:
#SAVE IMAGES
seriesToPlotDUAL = []
year_pairs = []

for i in range(6,17):
    #for i in range(6,7):
    year_pairs.append((i,i+1))
    
for year in year_pairs: 
    month_days = []
    #DECEMBER
    for day in range(1,32):
        date = '20' + ("0"+str(year[0]) if year[0] < 10 else str(year[0])) +'-12-' + ("0"+str(day) if day < 10 else str(day))
        month_days.append(date)
        
    #JANUARY    
    for day in range(1,32):
        date = '20' + ("0"+str(year[1]) if year[1] < 10 else str(year[1])) +'-01-' + ("0"+str(day) if day < 10 else str(day))
        month_days.append(date)

    #Get rdd
    month_rdd = dates_rdd.filter(lambda row: row[0] in month_days)

    #From dataframe
    month_df = sqlContext.createDataFrame(month_rdd, schema)

    month_df = month_df.select(month_df['Date'].cast(DateType()), 
                          month_df['Number_of_Reviews'].cast(IntegerType()))
    month_df = month_df.orderBy('Date')

    #Get days and reviews for those days
    month_days = month_df.select("Date").rdd.flatMap(lambda x: x).collect()
#         for day in month_count:
#             print(day)
    month_days = map(lambda x: x.day,month_days)
    
    month_count = month_df.select("Number_of_Reviews").rdd.flatMap(lambda x: x).collect()
    month_x = range(1,63)
    seriesToPlotDUAL.append((month_x,month_count,month_days))
   