In [1]:
import os
import sys
spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()
import pyspark

In [2]:
number_cores = 6
memory_gb = 16
conf = (pyspark.SparkConf().setMaster('local[{}]'.format(number_cores)).set("spark.driver.maxResultSize", "5g"))
sc = pyspark.SparkContext(conf=conf)

In [3]:
!dir /users/trush/CSC496/Labs/Lab4/data

yelp_academic_dataset_business.json.gz	yelp_academic_dataset_user.json.gz
yelp_academic_dataset_review.json.gz


In [4]:
!ls -lh /users/trush/CSC496/Labs/Lab4/data

total 4.2G
-rw-r--r-- 1 trush PDC-edu-Lab  25M Nov 29 06:12 yelp_academic_dataset_business.json.gz
-rw-r--r-- 1 trush PDC-edu-Lab 2.4G Nov 29 06:18 yelp_academic_dataset_review.json.gz
-rw-r--r-- 1 trush PDC-edu-Lab 1.8G Nov 29 06:16 yelp_academic_dataset_user.json.gz


# 1. Identify 100 users with highest number of ratings/fans.  

In [5]:
# Step 1: Load the data using SQL Context

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [6]:
# get SQL table
df_json = sqlContext.read.json("/users/trush/CSC496/Labs/Lab4/data/yelp_academic_dataset_user.json.gz")

In [7]:
#df_json.printSchema()

In [8]:
df_json.registerTempTable("tbl_json")

  - PySpark way

In [9]:
df_data = sc.textFile("/users/trush/CSC496/Labs/Lab4/data/yelp_academic_dataset_user.json.gz")
print(df_data.count())
#df_data.take(1)

1968703


#### Step 2:
  - Get the information using SQL Statement

In [10]:
#df_json.printSchema()

In [11]:
# 100 users with highest number of ratings/fans.
highest_fan = sqlContext.sql("SELECT user_id, review_count, fans FFuser_id, review_count, fans FROM tbl_json ORDER BY fans DESC LIMIT 100")

In [12]:
#highest_fan.show()

  - Get the information using PySpark Statement

In [13]:
#df_data.take(1)

In [14]:
import json

df_top_users1 = df_data.map(lambda x: json.loads(x)).map(lambda x: (x["user_id"], x["review_count"], x["fans"]))
#df_top_users1.take(1)

In [15]:
def extract_user(x):
    x_json = json.loads(x)
    return (x_json["user_id"], x_json["review_count"], x_json["fans"])

#tmp = df_data.take(1)
#extract_user(tmp[0])

In [16]:
df_top_users2 = df_data.map(extract_user).takeOrdered(100, lambda x: -x[2])
#df_top_users2

In [17]:
#highest_fan.show()

# 2. Extract the reviews of these users and combine it with the business information. 

## a. Are they continental, regional, or local eaters?  

In [18]:
df_reviews = sqlContext.read.json("/users/trush/CSC496/Labs/Lab4/data/yelp_academic_dataset_review.json.gz")
df_business = sqlContext.read.json("/users/trush/CSC496/Labs/Lab4/data/yelp_academic_dataset_business.json.gz")

In [19]:
df_reviews.registerTempTable("df_reviews")
df_business.registerTempTable("df_business")

In [20]:
business_locations = sqlContext.sql("SELECT user_id, latitude, longitude FROM df_reviews INNER JOIN df_business ON df_reviews.business_id=df_business.business_id")

In [21]:
# FILTER business_locations TO ONLY INCLUDE TOP 100 USERS
#top_business_locations = sqlContext.sql("SELECT user_id, latitude, longitude FROM business_locations INNER JOIN highest_fan ON business_locations.user_id=highest_fan.user_id")

In [22]:
#df_business.printSchema()

In [23]:
#df_reviews.printSchema()

In [24]:
business_locations.show()

+--------------------+-------------+---------------+
|             user_id|     latitude|      longitude|
+--------------------+-------------+---------------+
|OwjRMXRC0KyPrIlcj...|36.1128958062|-115.1776370528|
|nIJD_7ZXHq-FX8byP...|   33.3483825|   -111.8591895|
|V34qejxNsCbcgD8C0...|   41.4851918|    -81.8001448|
|ofKDkJKXSKZXu5xJN...|36.1183433533|-115.3388214111|
|UgMW8bLE0QMJDCkQ1...|    36.035068|    -115.046353|
|5vD2kmE25YBrbayKh...|    36.059969|    -115.036032|
|aq_ZxGHiri48TUXJl...|33.3399621215|-111.8597268956|
|dsd-KNYKMpx6ma_sR...|   36.2618907|   -115.2563469|
|P6apihD4ASf1vpPxH...|    33.590695|    -111.786547|
|jOERvhmK6_lo_XGUB...|    33.302013|     -111.84192|
|s5j_CRBWDCCMDJ6r7...|    36.123384|   -115.2085667|
|HJECayULRM-6xh2GC...|    43.648896|     -79.604506|
|1YIQGP-a534nyksaw...|   36.0510284|   -115.1829387|
|qftVgPj_kRTildMDj...|   33.3794154|   -111.8053095|
|5lb0POg2t-AkMFx66...|   40.1164204|    -88.2433829|
|TF4C-F5iqavACQgKT...|   36.2684007|   -115.29

In [25]:
#top_business_locations.show()

__Distance between 2 points lat, lon__

d=2*asin(sqrt((sin((lat1-lat2)/2))^2 + cos(lat1)*cos(lat2)*(sin((lon1-lon2)/2))^2))

https://edwilliams.org/avform147.htm

In [26]:
import math
def distance(lat1, lon1, lat2, lon2):
    return 2*math.asin(math.sqrt((math.sin((lat1-lat2)/2))**2 + math.cos(lat1)*math.cos(lat2)*(math.sin((lon1-lon2)/2))**2))

In [27]:
distance(36.1128958062, -115.1776370528, 33.3483825, -111.8591895)

2.795102054155977

In [28]:
locations = business_locations.rdd

In [29]:
locations1 = locations.map(lambda x: (x[0],[x[1], x[2]]))

In [30]:
locations1.take(20)

[('OwjRMXRC0KyPrIlcjaXeFQ', [36.1128958062, -115.1776370528]),
 ('nIJD_7ZXHq-FX8byPMOkMQ', [33.3483825, -111.8591895]),
 ('V34qejxNsCbcgD8C0HVk-Q', [41.4851918, -81.8001448]),
 ('ofKDkJKXSKZXu5xJNGiiBQ', [36.1183433533, -115.3388214111]),
 ('UgMW8bLE0QMJDCkQ1Ax5Mg', [36.035068, -115.046353]),
 ('5vD2kmE25YBrbayKhykNxQ', [36.059969, -115.036032]),
 ('aq_ZxGHiri48TUXJlpRkCQ', [33.3399621215, -111.8597268956]),
 ('dsd-KNYKMpx6ma_sRWCSkQ', [36.2618907, -115.2563469]),
 ('P6apihD4ASf1vpPxHODxAQ', [33.590695, -111.786547]),
 ('jOERvhmK6_lo_XGUBPws_w', [33.302013, -111.84192]),
 ('s5j_CRBWDCCMDJ6r7AYqjQ', [36.123384, -115.2085667]),
 ('HJECayULRM-6xh2GCCvLiA', [43.648896, -79.604506]),
 ('1YIQGP-a534nyksaweEFYA', [36.0510284, -115.1829387]),
 ('qftVgPj_kRTildMDjwiqtg', [33.3794154, -111.8053095]),
 ('5lb0POg2t-AkMFx6603cYQ', [40.1164204, -88.2433829]),
 ('TF4C-F5iqavACQgKTrbjRA', [36.2684007, -115.2956118]),
 ('2hRe26HSCAWbFRn5WChK-Q', [36.109706, -115.154021]),
 ('6sJN_HlM_uwpfLJ1puf1Fg', [4

In [31]:
user_latlon = locations1.groupByKey().mapValues(list)

In [32]:
user_latlon.take(1)

[('PWJ1Q4DUzJO_XL3xU8XtYA', [[33.581606, -111.878578]])]

In [33]:
def get_distances(x):
    distances = []
    for i in range(len(x[1])):
        for j in range(len(x[1])):
            distances.append(distance(x[1][i][0], x[1][i][1], x[1][j][0], x[1][j][1]))
    return (x[0], distances)

In [34]:
get_distances(('jbOJV077QCGzduE6zFYO_A',
  [[36.1023786, -115.1745465],
   [36.1351760705, -115.427117132],
   [36.1429464, -115.1749359],
   [36.0657173614, -115.4354310036],
   [36.1585998893, -115.2042446423],
   [36.1270336, -115.2098191],
   [33.42028, -111.8396342],
   [36.102917, -115.17],
   [36.1435299, -115.1469733],
   [36.117397, -115.1757521],
   [36.1182917, -115.1725932],
   [36.086209, -115.13706],
   [33.4154053, -111.8329174],
   [36.112503, -115.061928],
   [36.1564182, -115.2077663],
   [33.4509853, -111.7702695],
   [36.1019817648, -115.1738418651],
   [33.436137, -111.716897],
   [36.159706, -115.245078],
   [36.0966823101, -115.1758075725],
   [36.1740973156, -115.1451098718],
   [36.1201403, -115.1543617],
   [36.1213235673, -115.1749563217],
   [36.1584751, -115.1265495],
   [33.452261, -111.82834],
   [36.093883, -115.176202]]))

('jbOJV077QCGzduE6zFYO_A',
 [0.0,
  0.03262487377200505,
  0.04056779929070732,
  0.038129597939368334,
  0.05621512661048792,
  0.02465583880443593,
  2.7334607814753014,
  0.000550897935600806,
  0.04114765428817944,
  0.01501841370229729,
  0.015913131164942716,
  0.01621696348611009,
  2.738299037656287,
  0.010377807770786566,
  0.05403215587728947,
  2.702329518691445,
  0.00039726215954153824,
  2.716759444313038,
  0.0572920685150961,
  0.005696404397601083,
  0.07171153952274423,
  0.01776413151415443,
  0.018944968103816764,
  0.056080434356563624,
  2.701421131568569,
  0.008495744008577407,
  0.03262487377200505,
  0.0,
  0.008168690266101186,
  0.06945849538697008,
  0.023642169900003306,
  0.008117030627674372,
  2.701836417356956,
  0.03208100535016867,
  0.008827484725561663,
  0.017646165122769595,
  0.016752637018982063,
  0.04871999340512156,
  2.7067307436347035,
  0.022355315247906807,
  0.021458491887554353,
  2.6713491225221495,
  0.03302020230903977,
  2.6863835

In [35]:
user_distances = user_latlon.map(lambda x: get_distances(x))

In [36]:
#user_distances.take(1)

In [37]:
import statistics
def get_avg_dist(x):
    distances = []
    for i in range(len(x[1])):
        for j in range(len(x[1])):
            distances.append(distance(x[1][i][0], x[1][i][1], x[1][j][0], x[1][j][1]))
    return (x[0], statistics.mean(distances))

In [41]:
# Return triplet of number of local, regional, and continental reviews
def get_dist_classification(x):
    distances = []
    for i in range(len(x[1])):
        for j in range(len(x[1])):
            distances.append(distance(x[1][i][0], x[1][i][1], x[1][j][0], x[1][j][1]))
    max_dist = max(distances)
    if max_dist >= 0 and max_dist < 0.10:
        return (x[0], "local")
    elif max_dist >= 0.10 and max_dist < 0.50:
        return (x[0], "regional")
    elif max_dist >= 0.50:
        return (x[0], "continental")

In [42]:
get_avg_dist(('jbOJV077QCGzduE6zFYO_A',
  [[36.1023786, -115.1745465],
   [36.1351760705, -115.427117132],
   [36.1429464, -115.1749359],
   [36.0657173614, -115.4354310036],
   [36.1585998893, -115.2042446423],
   [36.1270336, -115.2098191],
   [33.42028, -111.8396342],
   [36.102917, -115.17],
   [36.1435299, -115.1469733],
   [36.117397, -115.1757521],
   [36.1182917, -115.1725932],
   [36.086209, -115.13706],
   [33.4154053, -111.8329174],
   [36.112503, -115.061928],
   [36.1564182, -115.2077663],
   [33.4509853, -111.7702695],
   [36.1019817648, -115.1738418651],
   [33.436137, -111.716897],
   [36.159706, -115.245078],
   [36.0966823101, -115.1758075725],
   [36.1740973156, -115.1451098718],
   [36.1201403, -115.1543617],
   [36.1213235673, -115.1749563217],
   [36.1584751, -115.1265495],
   [33.452261, -111.82834],
   [36.093883, -115.176202]]))

('jbOJV077QCGzduE6zFYO_A', 0.8597743089261034)

In [43]:
get_dist_classification(('jbOJV077QCGzduE6zFYO_A',
  [[36.1023786, -115.1745465],
   [36.1351760705, -115.427117132],
   [36.1429464, -115.1749359],
   [36.0657173614, -115.4354310036],
   [36.1585998893, -115.2042446423],
   [36.1270336, -115.2098191],
   [33.42028, -111.8396342],
   [36.102917, -115.17],
   [36.1435299, -115.1469733],
   [36.117397, -115.1757521],
   [36.1182917, -115.1725932],
   [36.086209, -115.13706],
   [33.4154053, -111.8329174],
   [36.112503, -115.061928],
   [36.1564182, -115.2077663],
   [33.4509853, -111.7702695],
   [36.1019817648, -115.1738418651],
   [33.436137, -111.716897],
   [36.159706, -115.245078],
   [36.0966823101, -115.1758075725],
   [36.1740973156, -115.1451098718],
   [36.1201403, -115.1543617],
   [36.1213235673, -115.1749563217],
   [36.1584751, -115.1265495],
   [33.452261, -111.82834],
   [36.093883, -115.176202]]))

('jbOJV077QCGzduE6zFYO_A', 'continental')

In [44]:
user_avg_dist = user_latlon.map(lambda x: get_avg_dist(x))

In [45]:
#user_avg_dist.take(10)

In [46]:
user_dist_classification = user_latlon.map(lambda x: get_dist_classification(x))

In [47]:
user_dist_classification.take(1)

[('cJshXISsMBDVIiUO_mCl1A', 'regional')]

## b. Is there a preference in restaurant/food style of their reviews? 

In [48]:
reviewed_businesses = sqlContext.sql("SELECT user_id, business_id FROM df_reviews")

In [49]:
reviewed_businesses.show()

+--------------------+--------------------+
|             user_id|         business_id|
+--------------------+--------------------+
|OwjRMXRC0KyPrIlcj...|-MhfebM0QIsKt87iD...|
|nIJD_7ZXHq-FX8byP...|lbrU8StCq3yDfr-QM...|
|V34qejxNsCbcgD8C0...|HQl28KMwrEKHqhFrr...|
|ofKDkJKXSKZXu5xJN...|5JxlZaqCnk1MnbgRi...|
|UgMW8bLE0QMJDCkQ1...|IS4cv902ykd8wj1TR...|
|5vD2kmE25YBrbayKh...|nlxHRv1zXGT0c0K51...|
|aq_ZxGHiri48TUXJl...|Pthe4qk5xh4n-ef-9...|
|dsd-KNYKMpx6ma_sR...|FNCJpSn0tL9iqoY3J...|
|P6apihD4ASf1vpPxH...|e_BiI4ej1CW1F0EyV...|
|jOERvhmK6_lo_XGUB...|Ws8V970-mQt2X9CwC...|
|s5j_CRBWDCCMDJ6r7...|PA61Rwk3AMwOEXHev...|
|HJECayULRM-6xh2GC...|l-nL4BmhzpZjcavoo...|
|1YIQGP-a534nyksaw...|Naa6E0YU0Wr7jCuCE...|
|qftVgPj_kRTildMDj...|Ns4tjgLfqR1qawGlN...|
|5lb0POg2t-AkMFx66...|ZlCSsWS07JulSBIQl...|
|TF4C-F5iqavACQgKT...|7Ka9Pd8X9SRHs1D5E...|
|2hRe26HSCAWbFRn5W...|d4qwVw4PcN-_2mK2o...|
|6sJN_HlM_uwpfLJ1p...|oVuZtlCFg_zF090Nh...|
|kMkWON2lmw0s-M-fw...|_iGvLfEsqDwPUxRUA...|
|QodunSzok4nIYFNrT...|poSV39UqEg

In [50]:
business_styles = sqlContext.sql("SELECT user_id, categories FROM df_reviews INNER JOIN df_business ON df_reviews.business_id=df_business.business_id")

In [51]:
business_styles.show()

+--------------------+--------------------+
|             user_id|          categories|
+--------------------+--------------------+
|OwjRMXRC0KyPrIlcj...|Shopping, Arts & ...|
|nIJD_7ZXHq-FX8byP...|Beauty & Spas, Ha...|
|V34qejxNsCbcgD8C0...|Restaurants, Gast...|
|ofKDkJKXSKZXu5xJN...|Restaurants, Mexican|
|UgMW8bLE0QMJDCkQ1...|Fast Food, Restau...|
|5vD2kmE25YBrbayKh...|Restaurants, Deli...|
|aq_ZxGHiri48TUXJl...|   Restaurants, Thai|
|dsd-KNYKMpx6ma_sR...|Doctors, Cosmetic...|
|P6apihD4ASf1vpPxH...|Restaurants, Italian|
|jOERvhmK6_lo_XGUB...|Home & Garden, Re...|
|s5j_CRBWDCCMDJ6r7...|Local Services, S...|
|HJECayULRM-6xh2GC...|Specialty Food, R...|
|1YIQGP-a534nyksaw...|Home Services, Fu...|
|qftVgPj_kRTildMDj...|Asian Fusion, Fas...|
|5lb0POg2t-AkMFx66...|Airport Shuttles,...|
|TF4C-F5iqavACQgKT...|Contractors, Pool...|
|2hRe26HSCAWbFRn5W...|Mexican, Restaurants|
|6sJN_HlM_uwpfLJ1p...|Restaurants, Indi...|
|kMkWON2lmw0s-M-fw...|Automotive, Car D...|
|QodunSzok4nIYFNrT...|Restaurant

In [52]:
user_reviewed_businesses = reviewed_businesses.rdd.groupByKey().mapValues(list)

In [53]:
#user_reviewed_businesses.take(1)

In [54]:
user_styles = business_styles.rdd.groupByKey().mapValues(list)

In [55]:
user_styles.take(10)

[('Tpq5d4yEQRX2tgt3tWntoQ',
  ['Food, American (Traditional), Restaurants, Bakeries, Southern, Soul Food',
   'Nightlife, Restaurants, Food, American (New), Lounges, Bars',
   'Restaurants, Barbeque, American (Traditional)',
   'Restaurants, Food, American (New), French, Brasseries, Beer, Wine & Spirits',
   'Pakistani, Nightlife, Wine Bars, Indian, Bars, Restaurants',
   'American (Traditional), Restaurants, Sandwiches, American (New)',
   'Seafood, Steakhouses, Wine Bars, Nightlife, American (Traditional), Restaurants, Bars',
   'Tex-Mex, Mexican, Restaurants',
   'Buffets, Japanese, Chinese, Asian Fusion, Korean, Food, Restaurants, Food Delivery Services',
   'Data Recovery, Local Services, IT Services & Computer Repair, Security Systems, Home Theatre Installation, Home Services',
   'Italian, Restaurants',
   'American (New), Restaurants, American (Traditional)',
   'Arts & Entertainment, Wine Bars, Bars, Tapas Bars, Restaurants, Nightlife',
   'Mexican, Restaurants',
   'Specialty

In [56]:
def get_styles(x):
    if x[1] != []:
        styles = []
        for i in range(len(x[1])):
            y = x[1][i].split(", ")
            for j in range(len(y)):
                styles.append(y[j])
        return (x[0], styles)
    return ('-', [])
    #return (x[0], [])

In [57]:
get_styles(('Lnwip57QIhwr81NhubGMgQ',
  ['Travel Services, Tours, Airport Shuttles, Transportation, Hotels & Travel']))

('Lnwip57QIhwr81NhubGMgQ',
 ['Travel Services',
  'Tours',
  'Airport Shuttles',
  'Transportation',
  'Hotels & Travel'])

In [58]:
user_styles1 = user_styles.map(lambda x: get_styles(x))

In [59]:
# Get count of each style instead of FrequentItemsets
user_styles1.take(10)

[('BPh-OMqPul6HXsnCHxsk6g',
  ['Bars',
   'Sports Bars',
   'Nightlife',
   'Sandwiches',
   'Restaurants',
   'Breakfast & Brunch',
   'American (New)',
   'Bakeries',
   'Cafes',
   'Restaurants',
   'Food',
   'Sandwiches',
   'Breakfast & Brunch',
   'Soup',
   'Beer Bar',
   'Pubs',
   'Bars',
   'Restaurants',
   'American (Traditional)',
   'Nightlife',
   'Beer Gardens',
   'Burgers',
   'Sandwiches',
   'Specialty Food',
   'Restaurants',
   'Food',
   'Cheese Shops',
   'Imported Food',
   'Delis',
   'Ethnic Food',
   'Fast Food',
   'Diners',
   'Restaurants',
   'Poutineries',
   'Specialty Food',
   'Food',
   'Desserts',
   'Candy Stores']),
 ('ngS7nhw10RJcxsIH7lBonw',
  ['Nightlife',
   'Bars',
   'Wine Bars',
   'Seafood',
   'French',
   'Restaurants',
   'Breakfast & Brunch',
   'Hotels',
   'Event Planning & Services',
   'Hotels & Travel',
   'Venues & Event Spaces',
   'Restaurants',
   'Food',
   'Cafes',
   'Coffee & Tea',
   'Bakeries',
   'Restaurants',
   'Sh

In [60]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import SparkSession

spark = SparkSession(sc)
df_FP = spark.createDataFrame(user_styles1, ["user_id","styles"])
df_FP.take(5)

[Row(user_id='phqw923eSkcwvoNca5ZSxw', styles=['Restaurants', 'Pakistani', 'Indian']),
 Row(user_id='cpG9KsQSYpKRx8fp4ZzQ7Q', styles=['Home Cleaning', 'Home Services']),
 Row(user_id='1No5nPT1uG_iSN8ywPmrvQ', styles=['Home Services', 'Knife Sharpening', 'Shopping', 'Kitchen & Bath', 'Local Services', 'Home & Garden', 'Sushi Bars', 'Restaurants', 'Japanese', 'Seafood Markets', 'Food', 'Specialty Food', 'Italian', 'Restaurants', 'American (New)', 'Nightlife', 'Pizza', 'Lounges', 'Cocktail Bars', 'Bars', 'Soup', 'Noodles', 'Japanese', 'Restaurants', 'Food', 'Ramen', 'Ethnic Food', 'Specialty Food', 'Restaurants', 'Sushi Bars', 'Japanese', 'Motorcycle Repair', 'Automotive', 'Motorcycle Dealers', 'Auto Parts & Supplies', 'Motorcycle Parts & Supplies', 'Japanese', 'Gluten-Free', 'Restaurants', 'Sushi Bars', 'Bars', 'Nightlife', 'Food', 'Asian Fusion', 'Lounges']),
 Row(user_id='V7uS5US4oTf-S9u36HJQCQ', styles=['Local Services', 'Thai', 'Restaurants', 'Restaurants', 'Restaurants', 'Bars', 'Ni

In [61]:
minSupport = 120 / df_FP.count()
#print(minSupport)
fpGrowth = FPGrowth(itemsCol="styles", minSupport=minSupport, minConfidence=0.6)
model = fpGrowth.fit(df_FP)

# Display frequent itemsets.
model.freqItemsets.show()

Py4JJavaError: An error occurred while calling o200.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 21, pcvm605-1.emulab.net, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-58-3c3845bb19f2>", line 1, in <lambda>
  File "<ipython-input-56-3062fc6ffaaa>", line 5, in get_styles
AttributeError: 'NoneType' object has no attribute 'split'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2981)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2980)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2980)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark-3.0.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-58-3c3845bb19f2>", line 1, in <lambda>
  File "<ipython-input-56-3062fc6ffaaa>", line 5, in get_styles
AttributeError: 'NoneType' object has no attribute 'split'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


## c. Can you infer the locations of these users?

# 3. Identify one of your favorite restaurants that is available on Yelp. Search for all reviews and reviewers for this restaurants. 

- Is this restaurant frequented by non-local reviewers (how do you know)?
- What are the positive things about this restaurant (study higher-rated reviews)
- What are the negative things about this restaurant (study lower-rated reviews)

In [None]:
df_business_data = sc.textFile("/users/trush/CSC496/Labs/Lab4/data/yelp_academic_dataset_business.json.gz")
#print(df_business_data.count())
df_business_data.take(1)

In [None]:
df_restaurants = df_business_data.map(lambda x: (json.loads(x)["name"]))

In [None]:
df_restaurants.distinct().collect()

In [None]:
df_KFC = df_business_data.filter(lambda x: json.loads(x)["name"] == 'KFC')

In [None]:
#df_KFC.collect()

## KFC

In [None]:
df_KFC_reviews = sqlContext.sql("SELECT user_id, text FROM df_reviews WHERE df_reviews.business_id='ypILNgy7QFskKAdcPKB2RQ'")

In [None]:
df_KFC_reviews.collect()

In [62]:
# Just one location