In [1]:
#setup
import sys 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
#setting paths for data files
business_path = 'business.csv'
review_path = 'review.csv'

# create Spark context with Spark configuration
conf = SparkConf().setAppName("Business and Reviews File.")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [3]:
#preprocessing the business data

# read input business csv file to RDD
business = sc.textFile(business_path)
business_data = list()

# collect the RDD to a list
llist = business.collect()

# checking the line type 
for line in llist:
    business_data.append(tuple(line.split('::')))
    
#checking for the records as tuples
#for line in business_data:
#    print(line)

#setting up the file headers
business_columns = ['businessid','address','categories']

#converting the RDD to the DataFrame
business_df = sqlContext.createDataFrame(business_data,business_columns)

#printing the DF
business_df.show()
print(business_df.count())

+--------------------+--------------------+--------------------+
|          businessid|             address|          categories|
+--------------------+--------------------+--------------------+
|Iu-oeVzv8ZgP18NIB...|3320 S Hill StSou...|List(Medical Cent...|
|ae5tm46-ewAhUJ6HR...|12 Arrow StSte 10...|List(Massage, Bea...|
|lb9qELwduFgBo408x...|19 King St E2nd f...|List(Tattoo, Beau...|
|HPWmjuivv3xJ279qS...|325 W State StWes...|List(Music & DVDs...|
|DARh6yjrGGVZ7R3Gd...|2803 San Jacinto ...|List(Food, Coffee...|
|ZKcVHVlD-kwmXFlnP...|817 West Peachtre...|List(Property Man...|
|HIPGr2gSEN4T73tjz...|1 Palmer Sq EPrin...|List(Pubs, Bars, ...|
|A_Fm4v2-gQuGBBI-G...|16 Division StCoh...|List(Pizza, Resta...|
|-ilTnXu41RrxQITuo...|3218 Market StUni...|List(Food, Coffee...|
|L5ABFgTmN-T-uZjuG...|5829 Kirby DrWest...|List(Food, Specia...|
|M1pXCGikSigFib_ZZ...|257 Marlborough S...|List(Event Planni...|
|__PHRVOgyR-WgkKPH...|4126 Executive Dr...|List(Elementary S...|
|9j6aqUdpPojPFkb26...|800

In [4]:
#preprocessing the review data

# read input review csv file to RDD
review = sc.textFile(review_path)
review_data = list()

# collect the RDD to a list
llist = review.collect()

# checking the line type 
for line in llist:
    review_data.append(tuple(line.split('::')))
    
#checking for the records as tuples
#for line in review_data:
#    print(line)

#setting up the file headers
review_columns = ['reviewid','userid','businessid','stars']

#converting the RDD to the DataFrame
review_df = sqlContext.createDataFrame(review_data,review_columns)

#printing the DF
review_df = review_df.drop(review_df.reviewid)
review_df.show()
print(review_df.count())

+--------------------+--------------------+-----+
|              userid|          businessid|stars|
+--------------------+--------------------+-----+
|kT43SxDgMGzbeXpO5...|wbpbaWBfU54JbjLID...|  5.0|
|T9hGHsbJW9Hw1cJAl...|4iTRjN_uAdAb7_YZD...|  5.0|
|Z_WAxc4RUpKp3y12B...|qw5gR8vW7mSOK4VRO...|  4.0|
|OlMjqqzWZUv2-62CS...|81IjU5L-t-QQwsE38...|  4.0|
|fs5bpfk-2pvq2v8S1...|Hnz1_h_D1eHSRtQqH...|  2.0|
|jm0kgXVEdcQ_tQNzs...|ZRm8fSEBn8DsSLD4o...|  4.0|
|1IzWxAfxuHTnzKOup...|-tphABJRkegXV4Fr1...|  4.0|
|n_zEixNdHIh9bwEfj...|kYD1PIXBsyZQXG_AE...|  4.0|
|bkIgi_PJyr6-2doVw...|Q0t7EEaygJ2hb4uFn...|  4.0|
|Hjpn4meBXbjJHd8bk...|fcdjnsgO8Z5LthXUx...|  3.0|
|E70xNGVyUEeoNw3tr...|JrGSfjRqAtIZjjwC3...|  5.0|
|NNFVXXEkb9Ur2ihVk...|jTPr0dFk2JgGYdDiI...|  5.0|
|Jtfz1WbIB_FtYDkO3...|8sGKlCVtewMuA2RGy...|  3.0|
|H7s0wAZUKc0V1nahM...|JrGSfjRqAtIZjjwC3...|  5.0|
|c4mkYDTjJc-_lOM32...|pTTUqKB3TxaW43ElW...|  4.0|
|NJ_U2zzmgI53dSPN0...|jTPr0dFk2JgGYdDiI...|  2.0|
|-uQcgLHrjba2hV14h...|V9DXseGJNzLeCxUpM...|  3.0|


In [5]:
#filtering Stanford businesses
temp = business_df.filter(business_df.address\
                          .like('%Stanford, CA%'))\
                  .drop(business_df.address)\
                  .drop(business_df.categories)

temp.show()

print(temp.count())

+--------------------+
|          businessid|
+--------------------+
|PBqjmOB7Yjti3cFqy...|
|PiiDzC9wPzfVA-xyT...|
|dTn4zWESE49YRd5Mo...|
|4soRFJ8cijXfCeWj7...|
|-ar6SC9nUaam_KbOJ...|
|f_y0wlUALp8O95b3s...|
|9rdigHXd49AKYopcG...|
|A9F5f4aXXkrGCnHR6...|
|SiBhiwJdO6ivfnMID...|
|2t05Wbot3-yqN1Dbf...|
|kzV63tmirtI1La8-f...|
|szvwhj2QsblOU5yn3...|
|VMQv_BIaKyORO0YKa...|
|3CU32j5m54UJg45DD...|
|YmupeGDFKFLPfPUAe...|
|F-FigR53J4BFf8fWI...|
|vNxTvFz2cKLmoHVIf...|
|zC5XpSAh0cojJzKsG...|
|DywFslWJ0MtA2_ycl...|
|mQSwnJ31A-fxscL8-...|
+--------------------+
only showing top 20 rows

220


In [6]:
#table joins for business and table
business_review_table = temp.join(review_df, review_df.businessid==temp.businessid)\
                            .distinct()\
                            .drop(temp.businessid)\
                            .drop(review_df.businessid)

business_review_table.show()

print(business_review_table.count())

+--------------------+-----+
|              userid|stars|
+--------------------+-----+
|iL_CR-WQnveqrff9L...|  5.0|
|BPkM2rnhMO68Le6Xs...|  5.0|
|M6h9KYZHFaZYmVFop...|  1.0|
|4_fkSSTFYEUXYPhtp...|  5.0|
|ocv3RMJZFScAMHmud...|  5.0|
|SSvalZkiN3vqOSH76...|  5.0|
|9OyPEDQ0N6i0rWv9S...|  5.0|
|0sZ1BLZEocy6fl5Gy...|  5.0|
|T_ua0osjaKMUmyhwO...|  5.0|
|FHfjAReTovbu0chLv...|  4.0|
|rhHvKILdT3wYowrO2...|  3.0|
|LW-4t2psvoehLqkcV...|  3.0|
|7b9aT8BhpWaFncPaP...|  4.0|
|QBgKo-REQCVZBQXT6...|  4.0|
|tIRh_AmxJhUgzoHUB...|  3.0|
|8fgiuFMuN5yfoALzt...|  2.0|
|cJVqHvDLN9g3GE4Vf...|  3.0|
|ChfUouMqqio8Zin2w...|  2.0|
|DhOSuTlWNnlzoF0Qx...|  3.0|
|mZBwEijHmWFTyPktV...|  1.0|
+--------------------+-----+
only showing top 20 rows

1801


In [7]:
def print_format(x,y):
    return str(x) + '\t' + str(y)

print(print_format('1','2'))

1	2


In [8]:
#saving the question 1 answer into a text file
print_format_function = udf(lambda x,y : print_format(x,y), StringType())

_business_review_rating_df = business_review_table.withColumn('stars',\
                                                   print_format_function(col('userid'),col('stars')))\
                                                  .drop(col('userid'))

_business_review_rating_df.write.text('spark_sql_version/userid_rating_table.txt')

In [9]:
def to_float(x):
    return float(x)

print(to_float('1.0'))

1.0


In [10]:
to_float_function = udf(lambda x : to_float(x), FloatType())

review_df = review_df.withColumn('stars', to_float_function(col('stars')))

avg_review_df = review_df.groupBy('businessid').avg('stars')

avg_review_df.show()

+--------------------+------------------+
|          businessid|        avg(stars)|
+--------------------+------------------+
|tIIof6-zqlgxImFND...|3.9234234234234235|
|b7lMDnk5k84aEBQP_...|              4.75|
|Hc3d_UDV7qE30o6We...|              3.55|
|8ryg43cozq2DXB6ig...|               3.2|
|TuPliHI_pmNQa9ve2...| 3.477272727272727|
|UZ0adu-4hd27k-_UR...| 4.218905472636816|
|73Gb_mlpMyq6K34Oh...|3.0704225352112675|
|2zHPIfZF3-oiPpfaF...|3.2777777777777777|
|tbg4gR3jUUllR28cy...| 3.911392405063291|
|lpE4ayYybUrRFnXb3...|               3.0|
|R109PJPOP1puLUHQW...| 3.806451612903226|
|Ryt1Fhgz7sixMQSJi...|2.8076923076923075|
|MF_WPyya1KhokJg5_...|               1.0|
|I_0XYZrhi2A7weELf...|               1.0|
|oMy_UrvOsjcRw0rb3...| 3.848101265822785|
|DZXp8m38R0s9U3Saj...|3.9036144578313254|
|Y_8ZwQYDznZX1-rs_...| 4.036144578313253|
|VzFBygBuHNbyfpQkc...|             2.125|
|pBkkizqn2jsP6GJ-J...|               5.0|
|F1IDVrjun6KEfscaY...|               4.5|
+--------------------+------------

In [11]:
top_ten_business = avg_review_df.orderBy(col('avg(stars)').desc()).take(10)

top_ten_business_df = sqlContext.createDataFrame(top_ten_business,avg_review_df.columns)\
                                .withColumnRenamed('businessid','bid')

top_ten_business_df.printSchema()

root
 |-- bid: string (nullable = true)
 |-- avg(stars): double (nullable = true)



In [12]:
business_rating_table =  business_df.join(top_ten_business_df,\
                                         (business_df.businessid == top_ten_business_df.bid))\
                                    .distinct()\
                                    .drop('bid')

business_rating_table.show()

+--------------------+--------------------+--------------------+----------+
|          businessid|             address|          categories|avg(stars)|
+--------------------+--------------------+--------------------+----------+
|V0BNdhFIpzxKIGYAo...|1712 W Jefferson ...|List(Elementary S...|       5.0|
|VtLD3XfTT7T9T70UD...|3636 Nobel DrSte ...|List(Real Estate ...|       5.0|
|XYsw_9orEYR1T7jRo...|143 Harvard Ave2n...|List(Professional...|       5.0|
|pBkkizqn2jsP6GJ-J...|4150 Regents Park...|List(Dentists, He...|       5.0|
|wl0VZ6n9LrMpDGy_J...|103 Carnegie CtrS...|List(Shopping, Co...|       5.0|
|yS8o6eShVkLh_psn6...|400 W Rosemary St...|List(Skin Care, H...|       5.0|
|5FJdQwSolZ9WQMnFZ...|4225 Executive Sq...|List(Professional...|       5.0|
|ANQJQELoPU2z_nCgO...|731 Peachtree Str...|List(Religious Or...|       5.0|
|ZCELOqe_TRJ3I5ldV...|707 S 16th StLafa...|List(Automotive, ...|       5.0|
|oBsbQlzSdK7XozGE6...|200 Lothrop StOak...|List(Doctors, Hos...|       5.0|
+-----------

In [13]:
def print_format(x,y,a,b):
    return str(x) + '\t' + str(y) + '\t' + str(a) + '\t' + str(b)

print(print_format('1','2','3','4'))

1	2	3	4


In [14]:
print_format_function = udf(lambda x,y,a,b : print_format(x,y,a,b), StringType())

_business_rating_table  =  business_rating_table.withColumn('businessid',\
                                                 print_format_function(\
                                                 col('businessid'), col('address'), col('categories'), col('avg(stars)')))\
                                                .drop('address')\
                                                .drop('categories')\
                                                .drop('avg(stars)')

_business_rating_table.write.text('spark_sql_version/business_rating_table.txt')