In [1]:
import pyspark
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, create_map, lit
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
from itertools import chain
from faker import Faker
import os


In [2]:
import findspark
findspark.find()

ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

In [3]:
memory = '4g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [4]:
sc = pyspark.SparkContext(appName="temp")
sqlContext = SQLContext(sc)

sp = SparkSession.builder.config("spark.driver.memory", "15g").appName('ReviewsSpark').getOrCreate()

#### Function to Load Data

In [5]:
def load_business_data():
    path = "archive/yelp_academic_dataset_business.json"
    df = sqlContext.read.json(path, multiLine=False)
    return df

def load_review_data():
    path = "archive/yelp_filtnosamp_gt5.csv"
    df = sp.read.option("header",True).csv(path)
    return df

In [6]:
df = load_business_data()

df2 = load_review_data()

#### Show Data and Schema

In [10]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|        921 Pearl St|{null, null, 'bee...|6iYb2HFDywm3zjuRg...|Gastropubs, Food,...|       Boulder|{11:0-23:0, 11:0-...|      1|   40.0175444|   -105.2833481| Oskar Blues Taproom|      80302|          86|  4.0|   CO|
| 7000 NE Airport Way|{null, null, u'be...|tCbdrRPZA0oiIYSmH...|Salad, Soup, Sand...|      Portland|{5:0-18:0, 5:0-18...|      1

In [11]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [None]:
df2.show()

In [14]:
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- userID: string (nullable = true)
 |-- itemID: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- name: string (nullable = true)



# Business Data Manipulation

#### (TODO) Flattens the Structs so that all the Nested Columns are Normal Columns

In [None]:
##To be done
def flatten_df(df, prefix=""):
    return df

In [None]:
# df = df.select(flatten_df(df))
# df.show()

#### Clean up the Data and Keep Required Rows

In [7]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.select(["name", "latitude", "longitude", "city", "review_count", "stars", "attributes", "categories"])
    

    # END YOUR CODE HERE -----------
    return df



In [8]:
df = clean_data(df)
df.show()

+--------------------+-------------+---------------+--------------+------------+-----+--------------------+--------------------+
|                name|     latitude|      longitude|          city|review_count|stars|          attributes|          categories|
+--------------------+-------------+---------------+--------------+------------+-----+--------------------+--------------------+
| Oskar Blues Taproom|   40.0175444|   -105.2833481|       Boulder|          86|  4.0|{null, null, 'bee...|Gastropubs, Food,...|
|Flying Elephants ...|45.5889058992|-122.5933307507|      Portland|         126|  4.0|{null, null, u'be...|Salad, Soup, Sand...|
|      The Reclaimory|45.5119069956|-122.6136928797|      Portland|          13|  4.5|{null, null, null...|Antiques, Fashion...|
|         Great Clips|   28.9144823|    -81.2959787|   Orange City|           8|  3.0|{null, null, null...|Beauty & Spas, Ha...|
|   Crossfit Terminus|   33.7470274|    -84.3534244|       Atlanta|          14|  4.0|{null, null

#### Filter Entries with Empty Ratings Field

Since with this, we see that there are no restaraunts with null ratings, we know that we do not need to filter out any rows from this dataframe.

In [9]:
df.filter(df.stars.isNull()).show()

+----+--------+---------+----+------------+-----+----------+----------+
|name|latitude|longitude|city|review_count|stars|attributes|categories|
+----+--------+---------+----+------------+-----+----------+----------+
+----+--------+---------+----+------------+-----+----------+----------+



#### Filter the Data so Only Data from Atlanta is contained (temporarily)

In [None]:
df = df.filter(df.city == "Atlanta")
df.show()

# Reviews Data Manipulation

In [10]:
fakeGen = Faker() 
users = df2.select("userID").distinct().coalesce(1)
fakeNameMap = {}
for user in users.rdd.toLocalIterator():
    fakeNameMap[user["userID"]] = fakeGen.name()
    
mapping_expr = create_map([lit(x) for x in chain(*fakeNameMap.items())])

df2 = df2.withColumn("name", mapping_expr.getItem(col("userID")))


In [11]:
df2.toPandas().to_csv('yelp_filtnosamp_gt5_withname.csv')

In [12]:
df2.show()

+---+--------------------+--------------------+------+-------------------+
|_c0|              userID|              itemID|rating|               name|
+---+--------------------+--------------------+------+-------------------+
|  0|ak0TdVmGKo4pwqdJS...|buF9druCkbuXLX526...|   4.0|      Rickey Miller|
|  1|ak0TdVmGKo4pwqdJS...|bNZ3-0rse12NKdSVq...|   4.0|      Rickey Miller|
|  2|ak0TdVmGKo4pwqdJS...|BVsIaKL-8QXVjt0Z9...|   4.0|      Rickey Miller|
|  3|ak0TdVmGKo4pwqdJS...|4MClvr12OXBNvGu8h...|   5.0|      Rickey Miller|
|  4|ak0TdVmGKo4pwqdJS...|2vH58mhkEl8GdcDug...|   5.0|      Rickey Miller|
|  5|YoVfDbnISlW0f7abN...|RA4V8pr014UyUbDvI...|   4.0|Elizabeth Patterson|
|  6|YoVfDbnISlW0f7abN...|XJKy6GFNp0ALI7kU0...|   5.0|Elizabeth Patterson|
|  7|YoVfDbnISlW0f7abN...|7eclDcacLLle21r0R...|   1.0|Elizabeth Patterson|
|  8|YoVfDbnISlW0f7abN...|xG8nb41yshMoYYSwe...|   5.0|Elizabeth Patterson|
|  9|YoVfDbnISlW0f7abN...|etxutRwLeNEtDmsL5...|   5.0|Elizabeth Patterson|
| 10|YoVfDbnISlW0f7abN...

In [13]:
name_list = df2.select("userID", "name").distinct()

In [14]:
name_list.show()

+--------------------+------------------+
|              userID|              name|
+--------------------+------------------+
|1hrlsEQ8l43jK4aXq...|Steven Matthews MD|
|p8rCTA139YIM6DQNq...|     Sarah Johnson|
|gO9Ie4xaS9A2OEP1O...|      Morgan Gates|
|CLISa_cfsE09NGNRV...|     Samuel Taylor|
|lSYuWv8KdZiHYIhFx...|Frederick Williams|
|tlOYWLfely6j68T9m...|     Charles Jones|
|cOS-MNurk0vTQf2QL...|        Susan Lane|
|8G5x4agMlY9BPyQAV...|   Katrina Fuentes|
|mTcy82Ll983oC4YXD...|      Philip Ramos|
|vWn_D48yeEbWpX49N...|   Melanie Wheeler|
|6AGD5YdMITBtjGsou...|     Richard Jones|
|VZiomHaM9olXlV7el...|   Michael Gilmore|
|v7gsMsxQrQlQ4-JCe...|       John Waller|
|4llwVVDnBkjDFx_3t...|    Darren Lindsey|
|t-zh2SipbKPc2KfE6...|       Seth Gibson|
|KjOLEj6uoZq8h01B4...|     Reginald Mann|
|inF2vQg7SGNHlgRXV...|        Gina Moore|
|8YSOoy6vauTKk4Tzx...|     Kevin Collins|
|xO7EjbuU9cVLl7KvV...|     Erika Ramirez|
|3qFzAjc6f7RPNVPCh...|     George Harris|
+--------------------+------------

In [15]:
name_list.toPandas().to_csv('users_list.csv')

In [16]:
business_df = df2.select("itemID").distinct().coalesce(1)
business_df.show()

+--------------------+
|              itemID|
+--------------------+
|66o2Fw42ZhGSuX1VF...|
|oLJWjd5VAkMbLU2e3...|
|_jw0beMekkOuCEGRx...|
|4QKuGnvjiPta_kk7J...|
|fSsdhoCC3FsXrSAAQ...|
|OuBUzqGj02xunvlIs...|
|bxy3khT-2R66tcdKj...|
|QAX9PI0-cAJN6x7rr...|
|6a8EOxICJtgzHViVm...|
|3ZVgig7uux9jVtEZn...|
|usrqG3sAANrQPvaHl...|
|Nqy2tJV3AGqW9Uil-...|
|cyvpFpmpN0YgDykuO...|
|Agq4zoNLSIpT1_ZJb...|
|czsrWGmQRDwP0tBid...|
|FbZLY5XASP9phBySt...|
|4SRTmovGJLmUgsfL8...|
|3gvHGMSHo4D8eXXSJ...|
|DZXp8m38R0s9U3Saj...|
|W4h9Tckj5WFJk1ve8...|
+--------------------+
only showing top 20 rows



In [17]:
inner_join = business_df.join(df, business_df.itemID == df.business_id)

AttributeError: 'DataFrame' object has no attribute 'business_id'

In [None]:
inner_join.show()

In [None]:
inner_join.printSchema()

In [None]:
select_a = ["business_id", "postal_code", "longitude", "latitude", "name", "state", "stars", "address", "city"]
business_df = inner_join.select(select_a)
business_df.show()

In [None]:
business_df.toPandas().to_csv('business_list.csv')