In [1]:
import json
from datetime import datetime
from pyspark import SparkConf, SparkContext
from operator import add

### Task1: Data Exploration (4.5pts)

Task description: You will explore the review dataset and write a program to answer the following questions.

In [None]:
# params = sys.argv
# input_file, output_file, stopwords = params[1], params[2], params[3], 
# y, m, n = int(params[4]), int(params[5]), int(params[6])

In [2]:
results = {}

In [26]:
input_file = './review.json'
# output_file = sys.argv[2]
stopwords = './stopwords'
y = 2018
m = 4
n = 5

In [4]:
conf = SparkConf()
sc = SparkContext(conf=conf)

In [5]:
review = sc.textFile(input_file).map(lambda r: json.loads(r))

A. The total number of reviews (0.5pts)

In [6]:
results['A'] = review.map(lambda r: r['review_id']).count()
results

{'A': 1151625}

B. The number of reviews in a given year, y (1pts)

In [7]:
results['B'] = review.map(lambda r: r['date']) \
                    .filter(lambda r: datetime.strptime(r, '%Y-%m-%d %H:%M:%S').year==y).count()
results

{'A': 1151625, 'B': 202784}

C. The number of distinct users who have written the reviews (1pts)

In [8]:
results['C'] = review.map(lambda r: r['user_id']).distinct().count()
results

{'A': 1151625, 'B': 202784, 'C': 566269}

D. Top m users who have the largest number of reviews and its count (1pts)

In [9]:
results['D'] = review.map(lambda r: (r['business_id'], r['user_id'])).map(lambda r: (r[1], 1)) \
                     .reduceByKey(add).top(m, key=lambda r: r[1])
results

{'A': 1151625,
 'B': 202784,
 'C': 566269,
 'D': [('CxDOIDnH8gp9KXzpBHJYXw', 715),
  ('bLbSNkLggFnqwNNzzq-Ijw', 424),
  ('PKEzKWv_FktMm2mGPjwd0Q', 322),
  ('DK57YibC5ShBmqQl97CKog', 291)]}

E. Top n frequent words in the review text. The words should be in lower cases. The following punctuations “(”, “[”, “,”, “.”, “!”, “?”, “:”, “;”, “]”, “)” and the given stopwords are excluded (1pts)

In [10]:
punctuations = set(['(', '[', ',', '.', '!', '?', ':', ';', ']', ')'])
stopwords = set(w.strip() for w in open(stopwords))

In [17]:
def exclude_words(word):
    if word not in stopwords:
        re = ''
        for w in word:
            if w not in punctuations:
                re += w
        return re

In [30]:
tmp = review.map(lambda r: r['text']).flatMap(lambda t: t.lower().split(' ')) \
            .map(lambda w: (exclude_words(w), 1)).filter(lambda w: w[0]!=None).reduceByKey(add) \
#             .top(n, key=lambda c: c[1])
            .sortBy(lambda c: (-c[1], c[0])).take(n)
tmp

[('the', 6160209),
 ('and', 4431354),
 ('i', 3235332),
 ('a', 3213701),
 ('was', 2254455)]

In [31]:
results['E'] = [w for w,_ in tmp]
results['E']

['the', 'and', 'i', 'a', 'was']

In [None]:
with open(output_file, 'w+') as o:
    json.dump(results, o)

### Task2: Exploration on Multiple Datasets (4pts)

Task description: In task2, you will explore the two datasets together (i.e., review and business) and write a program to compute the average stars for each business category and output top n categories with the highest average stars. The business categories should be extracted from the “categories” tag in the business file. The categories should be split by comma and removed leading and trailing spaces. No other operations needed to process contents in the “categories” tag in the business file. Stars are extracted from the review dataset. Two datasets are joined by “business_id” tag. You need to implement a version without Spark (2pts) and a version with Spark (2pts). You could then compare their performance yourself (not graded).

In [None]:
# params = sys.argv
# input_file, business_file, output_file = params[1], params[2], params[3], 
# if_spark, n = params[4], int(params[5])

In [2]:
review_file = './review.json'
business_file = './business.json'
# output_file = ''
if_spark = 'spark'
n = 10

In [3]:
results = {}

##### W/ Spark

In [28]:
if if_spark=='spark':
#     conf = SparkConf()
#     sc = SparkContext(conf=conf)
    sc = SparkContext.getOrCreate()
    
    review = sc.textFile(review_file).map(lambda r: json.loads(r))
    business = sc.textFile(business_file).map(lambda r: json.loads(r))

In [29]:
review_rdd = review.map(lambda r: (r['business_id'], r['stars']))
business_rdd = business.map(lambda b: (b['business_id'], b['categories']))

In [30]:
stars = review_rdd.groupByKey().mapValues(list).map(lambda bid: (bid[0], (sum(bid[1]), len(bid[1]))))

In [31]:
stars.take(2)

[('abaIvBrlg3QI4FUG1v3bdA', (4.0, 1)), ('M8a5DRdXl8KMu4bMFLPgQg', (85.0, 20))]

In [32]:
categories = business_rdd.filter(lambda ctg: ctg[1]!=None) \
                        .mapValues(lambda ctgs: [ctg.strip() for ctg in ctgs.split(',')])

In [33]:
categories.take(2)

[('1SWheh84yJXfytovILXOAQ', ['Golf', 'Active Life']),
 ('QXAEGFB4oINsVuTFxEYKFQ',
  ['Specialty Food',
   'Restaurants',
   'Dim Sum',
   'Imported Food',
   'Food',
   'Chinese',
   'Ethnic Food',
   'Seafood'])]

In [34]:
aggregated = categories.leftOuterJoin(stars)

In [35]:
aggregated.take(2)

[('jZ23B--fu21is2zrWiy4Kg',
  (['Pet Groomers', 'Pets', 'Pet Sitting', 'Pet Services'], (10.0, 2))),
 ('2ffee6OI50skuFyEVmOBZQ',
  (['Eyelash Service', 'Beauty & Spas', 'Skin Care'], (5.0, 1)))]

In [36]:
# tmp = aggregated.map(lambda id_c_s: id_c_s[1]).filter(lambda c_s: c_s[1]!=None) \
#                 .flatMap(lambda c_s: [(c, c_s[1]) for c in c_s[0]]) \
#                 .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).mapValues(lambda s: s[0]/s[1]) \
#                 .top(n, key=lambda s:s[1])
# tmp

In [37]:
tmp = aggregated.map(lambda id_c_s: id_c_s[1]).filter(lambda c_s: c_s[1]!=None) \
                .flatMap(lambda c_s: [(c, c_s[1]) for c in c_s[0]]) \
                .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).mapValues(lambda s: s[0]/s[1]) \
                .sortBy(lambda r: (-r[1], r[0])).take(n)
tmp

[('Astrologers', 5.0),
 ('Audio/Visual Equipment Rental', 5.0),
 ('Bocce Ball', 5.0),
 ('Calabrian', 5.0),
 ('Chinese Martial Arts', 5.0),
 ('Christmas Markets', 5.0),
 ('Crane Services', 5.0),
 ('DIY Auto Shop', 5.0),
 ('Duplication Services', 5.0),
 ('Fireworks', 5.0)]

In [38]:
results['result'] = [[c_s[0], c_s[1]] for c_s in tmp]
results

{'result': [['Astrologers', 5.0],
  ['Audio/Visual Equipment Rental', 5.0],
  ['Bocce Ball', 5.0],
  ['Calabrian', 5.0],
  ['Chinese Martial Arts', 5.0],
  ['Christmas Markets', 5.0],
  ['Crane Services', 5.0],
  ['DIY Auto Shop', 5.0],
  ['Duplication Services', 5.0],
  ['Fireworks', 5.0]]}

##### W/O Spark

In [15]:
d = open(business_file, encoding="utf8").readlines()[1]
d

'{"business_id":"QXAEGFB4oINsVuTFxEYKFQ","name":"Emerald Chinese Restaurant","address":"30 Eglinton Avenue W","city":"Mississauga","state":"ON","postal_code":"L5R 3E7","latitude":43.6054989743,"longitude":-79.652288909,"stars":2.5,"review_count":128,"is_open":1,"attributes":{"RestaurantsReservations":"True","GoodForMeal":"{\'dessert\': False, \'latenight\': False, \'lunch\': True, \'dinner\': True, \'brunch\': False, \'breakfast\': False}","BusinessParking":"{\'garage\': False, \'street\': False, \'validated\': False, \'lot\': True, \'valet\': False}","Caters":"True","NoiseLevel":"u\'loud\'","RestaurantsTableService":"True","RestaurantsTakeOut":"True","RestaurantsPriceRange2":"2","OutdoorSeating":"False","BikeParking":"False","Ambience":"{\'romantic\': False, \'intimate\': False, \'classy\': False, \'hipster\': False, \'divey\': False, \'touristy\': False, \'trendy\': False, \'upscale\': False, \'casual\': True}","HasTV":"False","WiFi":"u\'no\'","GoodForKids":"True","Alcohol":"u\'full_

In [16]:
json.loads(d)['stars']

2.5

In [4]:
def json_to_dict(file, keys):
    result = []
    data = open(file, encoding="utf8").readlines()
    for d in data:
        dictionary = {}
        for k in keys:
            dictionary[k] = json.loads(d)[k]
        result.append(dictionary)
                        
    return result

stars = json_to_dict(review_file, keys=['business_id', 'stars'])
categories = json_to_dict(business_file, keys=['business_id', 'categories'])
# categories

In [17]:
stars_agg = {}
for data in stars:
    bs_id, s = data['business_id'], data['stars']
    if bs_id not in stars_agg:
        stars_agg[bs_id] = [s, 1]
    else:
        stars_agg[bs_id][0] += s
        stars_agg[bs_id][1] += 1
stars_agg

{'mRUVMJkUGxrByzMQ2MuOpA': [39.0, 10],
 'LUN6swQYa4xJKaM_UEUOEw': [130.0, 29],
 'NyLYY8q1-H3hfsTwuwLPCg': [410.0, 91],
 '6lj2BJ4tJeu7db5asGHQ4w': [50.0, 18],
 'Mem13A3C202RzT53npn4NA': [50.0, 12],
 'I4Nr-MVc26qWr08-S3Q1ow': [238.0, 58],
 'YSUcHqlKMPHHJ_cTrqtNrA': [34.0, 14],
 'wJj1EwYcXHdvA9zKqmb5hQ': [20.0, 4],
 'EIL41z-hvVCeYHqfA9PyWQ': [180.0, 40],
 'rBo_suDMdGmFQ0TtYg3b2Q': [9.0, 6],
 'PZMpjUQ9_JVCf4Htu6vZYg': [12.0, 6],
 'CZKkoMlqu0N0zCFK-3T_Fg': [203.0, 50],
 'ZNB91myFoOYgyXoG5LQeGQ': [5.0, 5],
 'KjicU7uxRt2KDEnO5cgxDQ': [58.0, 21],
 'z9aXGRH8xtqpNDFE5_I3KA': [74.0, 17],
 'nDaW2hhQV5KYiGH7HzAOcg': [74.0, 18],
 '8KmqWgL0UEdxJFwTZ_YZvQ': [87.0, 19],
 '5qG4UHurI1yEozwn25WAFw': [200.0, 50],
 'jg72eKbD8rlv0i1DRN5uxQ': [8.0, 3],
 'EgwGTDZ705TwudPJwAY0yQ': [86.0, 20],
 'PFmB_iraAbTN9XMka7soqQ': [134.0, 28],
 'sT6KKMATRTiNSJ896D7l2w': [36.0, 14],
 'Gyrez6K8f1AyR7dzW9fvAw': [378.0, 100],
 '1A1b0ELki8v-C-0VtdoGAg': [4.0, 1],
 '9nTF596jDvBBia2EXXiOOg': [31.0, 8],
 'HkbNItNrnXlNo59M0YyoMg': 

In [6]:
categories[1]

{'business_id': 'QXAEGFB4oINsVuTFxEYKFQ',
 'categories': 'Specialty Food, Restaurants, Dim Sum, Imported Food, Food, Chinese, Ethnic Food, Seafood'}

In [18]:
ctgr_agg = {}
for data in categories:
    bs_id, ctgrs = data['business_id'], data['categories']
    if ctgrs!=None:
        ctgr_agg[bs_id] = [ctgr.strip() for ctgr in ctgrs.split(',')]
ctgr_agg

{'1SWheh84yJXfytovILXOAQ': ['Golf', 'Active Life'],
 'QXAEGFB4oINsVuTFxEYKFQ': ['Specialty Food',
  'Restaurants',
  'Dim Sum',
  'Imported Food',
  'Food',
  'Chinese',
  'Ethnic Food',
  'Seafood'],
 'gnKjwL_1w79qoiV3IC_xQQ': ['Sushi Bars', 'Restaurants', 'Japanese'],
 'xvX2CttrVhyG2z1dFg_0xw': ['Insurance', 'Financial Services'],
 'HhyxOkGAM07SRYtlQ4wMFQ': ['Plumbing',
  'Shopping',
  'Local Services',
  'Home Services',
  'Kitchen & Bath',
  'Home & Garden',
  'Water Heater Installation/Repair'],
 '68dUKd8_8liJ7in4aWOSEA': ['Shipping Centers',
  'Couriers & Delivery Services',
  'Local Services',
  'Printing Services'],
 '5JucpCfHZltJh5r1JabjDg': ['Beauty & Spas', 'Hair Salons'],
 'gbQN7vr_caG_A1ugSmGhWg': ['Hair Salons',
  'Hair Stylists',
  'Barbers',
  "Men's Hair Salons",
  'Cosmetics & Beauty Supply',
  'Shopping',
  'Beauty & Spas'],
 'Y6iyemLX_oylRpnr38vgMA': ['Nail Salons', 'Beauty & Spas', 'Day Spas'],
 '4GBVPIYRvzGh4K4TkRQ_rw': ['Beauty & Spas',
  'Nail Salons',
  'Day Sp

In [8]:
len(set(stars_agg.keys())), len(set(ctgr_agg.keys()))

(148572, 192127)

In [9]:
### wrong!!!
# joined_dict = {} #{ctgr: (sum, len)}
# for bs_id, ctgrs in ctgr_agg.items():
#     for ctgr in ctgrs:
#         if bs_id in stars_agg:
#             if ctgr not in joined_dict:
#                 joined_dict[ctgr] = stars_agg[bs_id]
#             else:
#                 joined_dict[ctgr][0] += stars_agg[bs_id][0]
#                 joined_dict[ctgr][1] += stars_agg[bs_id][1]
# joined_dict

In [19]:
joined_dict = {}
for bs_id, s in stars_agg.items():
    if bs_id in ctgr_agg and ctgr_agg[bs_id]!=None:
#     if ctgr_agg.get(bs_id) is not None:
        for c in ctgr_agg[bs_id]:
            if c not in joined_dict:
                joined_dict[c] = s
            else:
#                 joined_dict[c][0] += s[0] # would change stars_agg too!!!
#                 joined_dict[c][1] += s[1]
                s1 = joined_dict[c][0] + s[0]
                s2 = joined_dict[c][1] + s[1]
                joined_dict[c] = [s1, s2]
joined_dict

{'Restaurants': [2692071.0, 723938],
 'Breakfast & Brunch': [429101.0, 111793],
 'Burgers': [243122.0, 68237],
 'American (New)': [474471.0, 125765],
 'Sports Bars': [94678.0, 28398],
 'Bars': [701505.0, 189620],
 'Nightlife': [764749.0, 206984],
 'Hotels & Travel': [188699.0, 58590],
 'Travel Services': [6508.0, 1962],
 'Transportation': [16632.0, 5806],
 'Party & Event Planning': [25931.0, 6485],
 'Limos': [5366.0, 1511],
 'Event Planning & Services': [339684.0, 93858],
 'Fast Food': [127305.0, 39729],
 'Vegetarian': [100819.0, 25279],
 'Vegan': [79635.0, 19422],
 'Indian': [52013.0, 13785],
 'Sandwiches': [312779.0, 82244],
 'Southern': [46690.0, 12329],
 'Chicken Wings': [71525.0, 21603],
 'Caterers': [101362.0, 26179],
 'Polish': [4232.0, 1021],
 'Delis': [62409.0, 16412],
 'Pizza': [250471.0, 67599],
 'Italian': [253095.0, 67463],
 'Roofing': [2653.0, 688],
 'Home Services': [180071.0, 49536],
 'Custom Cakes': [7904.0, 2030],
 'Food': [993364.0, 260209],
 'Greek': [50906.0, 12909

In [55]:
results_dict = {k: v[0]/v[1] for k,v in joined_dict.items()}
results_dict

{'Restaurants': 3.755635342376966,
 'Breakfast & Brunch': 3.755635342376966,
 'Burgers': 3.755635342376966,
 'American (New)': 3.755635342376966,
 'Sports Bars': 3.755635342376966,
 'Bars': 3.755635342376966,
 'Nightlife': 3.755635342376966,
 'Hotels & Travel': 3.642283345882606,
 'Travel Services': 3.642283345882606,
 'Transportation': 3.642283345882606,
 'Party & Event Planning': 3.642283345882606,
 'Limos': 3.642283345882606,
 'Event Planning & Services': 3.642283345882606,
 'Fast Food': 3.7693775114226606,
 'Vegetarian': 3.7693775114226606,
 'Vegan': 3.7693775114226606,
 'Indian': 3.7693775114226606,
 'Sandwiches': 3.754745992658088,
 'Southern': 3.754745992658088,
 'Chicken Wings': 3.754745992658088,
 'Caterers': 3.7871098693686345,
 'Polish': 3.7871098693686345,
 'Delis': 3.7643233325192407,
 'Pizza': 3.7643233325192407,
 'Italian': 3.7488106564117287,
 'Roofing': 3.7243600748764787,
 'Home Services': 3.7243600748764787,
 'Custom Cakes': 3.7675351799691663,
 'Food': 3.76753517996

In [46]:
results_dict = dict(sorted(results_dict.items(), key=lambda item: (-item[1], item[0]))[:n])
results_dict

{'Astrologers': 5.0,
 'Audio/Visual Equipment Rental': 5.0,
 'Bocce Ball': 5.0,
 'Calabrian': 5.0,
 'Chinese Martial Arts': 5.0,
 'Christmas Markets': 5.0,
 'Crane Services': 5.0,
 'DIY Auto Shop': 5.0,
 'Duplication Services': 5.0,
 'Fireworks': 5.0}

In [47]:
results['result'] = [[c, s] for c,s in results_dict.items()]
results

{'result': [['Astrologers', 5.0],
  ['Audio/Visual Equipment Rental', 5.0],
  ['Bocce Ball', 5.0],
  ['Calabrian', 5.0],
  ['Chinese Martial Arts', 5.0],
  ['Christmas Markets', 5.0],
  ['Crane Services', 5.0],
  ['DIY Auto Shop', 5.0],
  ['Duplication Services', 5.0],
  ['Fireworks', 5.0]]}

In [None]:
with open(output_file, 'w+') as o:
    json.dump(results, o)

### Task3: Partition (4pts)

Task description: In this task, you will learn how partitions work in the RDD. You need to compute the businesses that have more than n reviews in the review file. Other than the default way of partitioning the RDD, you should also design a customized partition function to improve computational efficiency. The “partition_type” is a hyperparameter in your program to decide which partition method to use. For either the default or the customized partition function, you need to show the number of partitions for the RDD, the number of items per partition, and the businesses that have more than n reviews (1pts for each partition type). Your customized partition function should improve the computational efficiency, i.e., reducing the time duration of execution (2pts).

In [None]:
# params = sys.argv
# input_file, output_file = params[1], params[2]
# partition_type, n_partitions，n = params[3], int(params[4]), int(params[5])

In [18]:
input_file = './review.json'
# output_file = ''
# partition_type = 'default'
partition_type = 'customized'
n_partitions = 3
n = 1024

In [3]:
results = {}

In [4]:
conf = SparkConf()
sc = SparkContext(conf=conf)

review = sc.textFile(input_file).map(lambda r: json.loads(r))

In [5]:
bs_id_rdd = review.map(lambda r: (r['business_id'], 1))

In [9]:
if partition_type!='default':
    bs_id_rdd = bs_id_rdd.partitionBy(n_partitions, lambda bs_id: ord(bs_id[0])+ord(bs_id[-1]))

In [21]:
results['n_partitions'] = bs_id_rdd.getNumPartitions()
results['n_items'] = bs_id_rdd.glom().map(len).collect()
tmp = bs_id_rdd.reduceByKey(add).filter(lambda c: c[1]>n).collect()
results['result'] = [[bs_id, c] for bs_id, c in tmp]

In [22]:
results

{'n_partitions': 3,
 'n_items': [398690, 359496, 393439],
 'result': [['f4x1YBxkLrZg652xt2KR5g', 1039],
  ['K7lWdNUhCbcnEvI0NhGewg', 1156],
  ['RESDUcs7fIiihp38-d6_6g', 1502],
  ['4JNXUYY8wbaaDmk3BPzlWw', 1446]]}

In [None]:
with open(output_file, 'w+') as o:
    json.dump(results, o)