#### Yelp dataset exploration
Due to the data size is big and cannot fit into the memory, we would need to use Spark to explore the data.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,to_timestamp,col


In [3]:
spark = SparkSession \
    .builder \
    .appName("Spark Exploration") \
    .getOrCreate()


In [3]:
yelp_data = spark.read.json("yelp_data/yelp_academic_dataset_business.json")

In [4]:
#Huge data...
yelp_data.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [29]:
#show one record to see
yelp_data.show(1)

+------------+--------------------+--------------------+--------------------+-------+--------------------+-------+----------+------------+-------------------+-----------+------------+-----+-----+
|     address|          attributes|         business_id|          categories|   city|               hours|is_open|  latitude|   longitude|               name|postal_code|review_count|stars|state|
+------------+--------------------+--------------------+--------------------+-------+--------------------+-------+----------+------------+-------------------+-----------+------------+-----+-----+
|921 Pearl St|[,, 'beer_and_win...|6iYb2HFDywm3zjuRg...|Gastropubs, Food,...|Boulder|[11:0-23:0, 11:0-...|      1|40.0175444|-105.2833481|Oskar Blues Taproom|      80302|          86|  4.0|   CO|
+------------+--------------------+--------------------+--------------------+-------+--------------------+-------+----------+------------+-------------------+-----------+------------+-----+-----+
only showing top 1 r

In [15]:
yelp_data.createOrReplaceTempView("business")

In [21]:
#how many entries are restaurant related
spark.sql("SELECT count(*) /160585 from business where lower(categories) like '%restaurant%'").collect()

[Row((CAST(count(1) AS DOUBLE) / CAST(160585 AS DOUBLE))=0.3162997789332752)]

In [4]:
yelp_checkin = spark.read.json("yelp_data/yelp_academic_dataset_checkin.json")

In [5]:
yelp_checkin.printSchema()
yelp_checkin.createOrReplaceTempView("checkin")

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [10]:
#Wonderful have data from 2010 to 2021~~~
#use explode and split to put array into 
#yelp_checkin = spark.read.json("s3://dend-data-raw/yelp-data/yelp_academic_dataset_checkin.json")
#yelp_checkin.createOrReplaceTempView("checkin")
# You can print out the text to the console like so:
out_checkin = spark.sql("select business_id,explode(split(date,',')) as date from checkin")
out_checkin.createOrReplaceTempView("checkin_exploded")
out_checkin2 = spark.sql("select business_id,CAST(UNIX_TIMESTAMP(date, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) from checkin_exploded limit 10")
print(out_checkin2.collect())

[Row(business_id='--0r8K_AQ4FZfLsX3ZYRDA', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2017, 9, 3, 17, 13, 59)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2010, 10, 8, 22, 21, 20)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2010, 11, 1, 21, 29, 14)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2010, 12, 23, 22, 55, 45)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2011, 4, 8, 17, 14, 59)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.datetime(2011, 4, 11, 21, 28, 45)), Row(business_id='--0zrn43LEaB4jUWTQH_Bg', CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)=datetime.

In [11]:
out_checkin2.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP): timestamp (nullable = true)



In [None]:
#Some template script to create py
def main():

    # Create our Spark Session via a SparkSession builder
    spark = SparkSession.builder.appName("PySpark Example").getOrCreate()

    # Read in a file from S3 with the s3a file protocol
    # (This is a block based overlay for high performance supporting up to 5TB)
    yelp_checkin = spark.read.json("s3://dend-data-raw/yelp-data/yelp_academic_dataset_checkin.json")
    yelp_checkin.createOrReplaceTempView("checkin")
    # You can print out the text to the console like so:
    out_checkin = spark.sql("select * from checkin limit 1")
    print(out_checkin.collect())
    # The following code will format the loaded data into a CSV formatted file and save it back out to S3
    out_checkin.write.format("com.databricks.spark.csv").mode('overwrite').option("header", "true").save(
        path = "s3://dend-data-staging/yelp-data/")
    
    # Make sure to call stop() otherwise the cluster will keep running and cause problems for you
    spark.stop()
main()

In [23]:
yelp_user = spark.read.options(samplingRatio=1.0).json("yelp_data/yelp_academic_dataset_user.json")

In [24]:
yelp_user.printSchema()
yelp_user.createOrReplaceTempView("users")

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [None]:
spark.sql("select * from users").take(1)

In [12]:
#enriched = spark.sql("select *,TO_DATE(CAST(UNIX_TIMESTAMP(yelping_since, 'yyyy-MM-dd') AS TIMESTAMP)) as yelping_since_date from users")

In [26]:
#enriched.show()
#enriched.createOrReplaceTempView("enriched_users")

In [29]:
revised_users = spark.sql("select * from users").drop('friends')
revised_users = revised_users.withColumn("yelping_since",to_timestamp(col("yelping_since")))
revised_users.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)



In [34]:
revised_users.write.option("header", "true").mode("overwrite").parquet("yelp_data/output/stg_users_friends") 

In [35]:
pq_users = spark.read.parquet("yelp_data/output/stg_users_friends")

In [36]:
pq_users.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)



In [None]:
spark.sql("select _corrupt_record,user_id from users where _corrupt_record is null").show()

In [16]:
spark.sql("select user_id,explode(split(friends,',')) as friend_id from users").show()

+--------------------+--------------------+
|             user_id|           friend_id|
+--------------------+--------------------+
|q_QQ5kBBwlCcbL1s4...|xBDpTUbai0DXrvxCe...|
|q_QQ5kBBwlCcbL1s4...| 7GPNBO496aecrjJf...|
|q_QQ5kBBwlCcbL1s4...| gUfHciSP7BbxZd5g...|
|q_QQ5kBBwlCcbL1s4...| NXw0bCLF5ZtFMfhc...|
|q_QQ5kBBwlCcbL1s4...| OGjmMxPuIoLTJ3O-...|
|q_QQ5kBBwlCcbL1s4...| mwUcJP11UkIjCB8j...|
|q_QQ5kBBwlCcbL1s4...| fDmgb3Vi3-f_MtFO...|
|q_QQ5kBBwlCcbL1s4...| -x1516ZG5GllZiBj...|
|q_QQ5kBBwlCcbL1s4...| tx5UcfGrsud-CQdq...|
|q_QQ5kBBwlCcbL1s4...| HKooPGsHiZV_0vTn...|
|q_QQ5kBBwlCcbL1s4...| 2iSBJHVMNsolJ3AH...|
|q_QQ5kBBwlCcbL1s4...| NcoP47QP_eMVtoZv...|
|q_QQ5kBBwlCcbL1s4...| XPOx-mCubVGQ1rRk...|
|q_QQ5kBBwlCcbL1s4...| K6Tbv3a_qUQK0ed4...|
|q_QQ5kBBwlCcbL1s4...| _dUWTJf0faMXMdr_...|
|q_QQ5kBBwlCcbL1s4...| 5ni2bacPC7scIAHA...|
|q_QQ5kBBwlCcbL1s4...| cG-UHRz9QdhBEBz3...|
|q_QQ5kBBwlCcbL1s4...| BPh-OMqPul6HXsnC...|
|q_QQ5kBBwlCcbL1s4...| mgzNtI5XOuPwwukp...|
|q_QQ5kBBwlCcbL1s4...| pMbWlP0cA

In [None]:
spark.sql("select * from users limit 1").collect()

In [None]:
spark.sql("select average_stars,count(*) from users group by average_stars").collect()

In [21]:
def process_yelp_users(spark,input_data="",output_data ="yelp_data/output/"):
    """
    arg:  spark, inputdata,outputdata
    """
    #print(spark,input_data,output_data)
    yelp_checkin = spark.read.json(input_data+"yelp_data/yelp_academic_dataset_user.json")
    yelp_checkin.createOrReplaceTempView("users")
    yelp_users_ex_friends=spark.sql("select * from users").drop('friends')
    yelp_users_ex_friends = yelp_users_ex_friends.withColumn("yelping_since",to_timestamp(col("yelping_since"))) #convert varchar to timestamp
    #yelp_users_ex_friends=yelp_users_ex_friends.na.fill({'_corrupt_record': 'no_err'})
    yelp_users_friends = spark.sql("select user_id,explode(split(friends,',')) as friend_id from users")
    yelp_users_ex_friends.write.option("header", "true").mode("overwrite").parquet(output_data+"stg_users") 
    yelp_users_friends.write.option("header", "true").mode("overwrite").parquet(output_data+"stg_users_friends") 

In [22]:
process_yelp_users(spark)

In [25]:
parquetFile = spark.read.parquet("yelp_data/output/stg_users")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
parquetFile.printSchema()
#spark.sql("SELECT * from").show()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)



In [28]:
spark.sql("SELECT * from parquetFile limit 10").collect()

[Row(average_stars=1.6, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, elite='', fans=0, funny=1, name='John', review_count=15, useful=18, user_id='gRg1b1jMhyvATvTKqRK1bg', yelping_since=datetime.datetime(2011, 11, 1, 23, 15, 1)),
 Row(average_stars=1.0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, elite='', fans=0, funny=0, name='Barbie', review_count=2, useful=4, user_id='lssADo3u2Icm84siwkWzkw', yelping_since=datetime.datetime(2011, 2, 28, 2, 54, 11)),
 Row(average_stars=2.0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=1, compliment_note=0, compliment_photos=0, comp

In [3]:
yelp_reviews = spark.read.json("yelp_data/yelp_academic_dataset_review.json")
yelp_reviews.printSchema()
yelp_reviews.createOrReplaceTempView("reviews")

root
 |-- _corrupt_record: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
spark.sql("select * from reviews limit 1").collect()