# Yelp Spark ETL

### Setting Up Spark

In [1]:
import pyspark as ps
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, ArrayType

In [2]:
spark = (ps.sql.SparkSession.builder
        .appName("Yelp_ETL")
        .config("spark.driver.extraClassPath", "/home/jovyan/postgresql-42.2.20.jar")
        .master('local[3]')
        .getOrCreate()
        )

sc = spark.sparkContext

In [3]:
spark

### Connecting To Data

In [4]:
data_location = "/home/jovyan/work/Documents/Data_Science_Projects/Yelp_Reviews/data/full_data/original_json/"

In [5]:
filename_prefix = "yelp_academic_dataset_"

### Checkin DataPrep

In [6]:
df_checkin = spark.read.json(data_location + filename_prefix + "checkin.json")

In [7]:
df_checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [8]:
df_checkin.show(5)

+--------------------+--------------------+
|         business_id|                date|
+--------------------+--------------------+
|--0r8K_AQ4FZfLsX3...| 2017-09-03 17:13:59|
|--0zrn43LEaB4jUWT...|2010-10-08 22:21:...|
|--164t1nclzzmca7e...|2010-02-26 02:06:...|
|--2aF9NhXnNVpDV0K...|2014-11-03 16:35:...|
|--2mEJ63SC_8_08_j...|2010-12-15 17:10:...|
+--------------------+--------------------+
only showing top 5 rows



In [9]:
df_checkin.count()

138876

In [10]:
df_checkin_2 = df_checkin.withColumn("date_array", F.split(df_checkin.date, ',').cast(ArrayType(TimestampType())))

In [11]:
df_checkin_2.createOrReplaceTempView("df_checkin_2")
df_checkin_2.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_array: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)



In [12]:
df_checkin_final = spark.sql('''
                                 SELECT business_id,
                                     size(date_array) AS num_checkins,
                                     to_timestamp(array_min(date_array)) AS checkin_min,
                                     to_timestamp(array_max(date_array)) AS checkin_max
                                 FROM df_checkin_2
                             ''')

In [13]:
df_checkin_final.show(20)

+--------------------+------------+-------------------+-------------------+
|         business_id|num_checkins|        checkin_min|        checkin_max|
+--------------------+------------+-------------------+-------------------+
|--0r8K_AQ4FZfLsX3...|           1|2017-09-03 17:13:59|2017-09-03 17:13:59|
|--0zrn43LEaB4jUWT...|           9|2010-10-08 22:21:20|2011-08-29 19:01:31|
|--164t1nclzzmca7e...|         311|2010-02-26 02:06:53|2013-11-25 02:51:33|
|--2aF9NhXnNVpDV0K...|           8|2014-11-03 16:35:35|2020-12-29 16:22:00|
|--2mEJ63SC_8_08_j...|           4|2010-12-15 17:10:46|2016-06-11 19:56:11|
|--6COJIAjkQwSUZci...|         180|2012-09-23 20:58:03|2017-08-20 17:41:05|
|--DzGwfuJH12DjYz9...|           2|2014-08-02 14:12:55|2015-02-21 17:37:57|
|--EoF6KmeDuki2vBW...|           2|2012-08-27 21:10:37|2012-09-21 22:35:50|
|--JKSSgnfoOjVDFGv...|           4|2011-08-15 22:43:08|2018-09-12 14:04:02|
|--JuLhLvq3gyjNnXT...|           1|2016-06-08 15:40:03|2016-06-08 15:40:03|
|--Q3mAcX9t6

In [14]:
df_checkin_final.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- num_checkins: integer (nullable = false)
 |-- checkin_min: timestamp (nullable = true)
 |-- checkin_max: timestamp (nullable = true)



In [15]:
# df_checkin_final.coalesce(1).write.csv("checkin_post_etl.csv", header=True)

### User DataPrep

In [16]:
df_user = spark.read.json(data_location + filename_prefix + "user.json")

In [17]:
df_user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [18]:
df_user.count()

2189457

In [19]:
df_user.show(5)

+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer| cool|               elite|fans|             friends|funny|     name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+---------+------------+------+--------------------+-------------------+
|         

In [20]:
df_user.createOrReplaceTempView("df_user")

In [21]:
df_user_1 = spark.sql('''
                          SELECT user_id, 
                              to_timestamp(yelping_since) AS yelping_since,
                              split(replace(elite, '20,20', '2020'), ",") AS elite_array,
                              average_stars,
                              review_count,
                              fans,
                              size(split(friends, ",")) AS friend_count,
                              (compliment_cool + compliment_cute + compliment_funny 
                              + compliment_hot + compliment_list + compliment_more
                              + compliment_note + compliment_photos + compliment_plain
                              + compliment_profile + compliment_writer) AS compliments,
                              (cool + funny + useful) AS ufc_count
                          FROM df_user
                      ''')

In [22]:
df_user_1.createOrReplaceTempView("df_user_1")
df_user_final = spark.sql('''
                              SELECT user_id,
                                  yelping_since,
                                  average_stars,
                                  review_count,
                                  fans,
                                  friend_count,
                                  compliments,
                                  ufc_count,
                                  CASE
                                      WHEN size(elite_array) = 1 AND element_at(elite_array, 1) = "" THEN 0
                                      ELSE size(elite_array)
                                  END AS elite_count,
                                  CASE
                                      WHEN size(elite_array) = 1 AND element_at(elite_array, 1) = "" THEN 0
                                      ELSE int(array_min(elite_array))
                                  END AS elite_min,
                                  CASE
                                      WHEN size(elite_array) = 1 AND element_at(elite_array, 1) = "" THEN 0
                                      ELSE int(array_max(elite_array))
                                  END AS elite_max
                              FROM df_user_1
                          ''')

In [23]:
df_user_final.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- yelping_since: timestamp (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- fans: long (nullable = true)
 |-- friend_count: integer (nullable = false)
 |-- compliments: long (nullable = true)
 |-- ufc_count: long (nullable = true)
 |-- elite_count: integer (nullable = false)
 |-- elite_min: integer (nullable = true)
 |-- elite_max: integer (nullable = true)



In [24]:
df_user_final.show(5)

+--------------------+-------------------+-------------+------------+----+------------+-----------+---------+-----------+---------+---------+
|             user_id|      yelping_since|average_stars|review_count|fans|friend_count|compliments|ufc_count|elite_count|elite_min|elite_max|
+--------------------+-------------------+-------------+------------+----+------------+-----------+---------+-----------+---------+---------+
|q_QQ5kBBwlCcbL1s4...|2005-03-14 20:26:35|         3.85|        1220|1357|        5813|      15694|    36359|          9|     2006|     2014|
|dIIKEfOgo0KqUfGQv...|2007-08-10 19:01:51|         4.09|        2136|1025|        6296|      11797|    49607|         14|     2007|     2020|
|D6ErcUnFALnCQN4b1...|2007-02-07 15:47:53|         3.76|         119|  16|         835|        117|      446|          2|     2010|     2011|
|JnPIjvC0cmooNDfsa...|2009-02-09 16:14:29|         3.77|         987| 420|        1452|      11190|    15991|          6|     2009|     2014|
|37Hc8

In [25]:
# df_user_final.coalesce(1).write.csv("user_post_etl.csv", header=True)

### Business DataPrep

In [26]:
df_business = spark.read.json(data_location + filename_prefix + "business.json")

In [27]:
df_business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [28]:
df_business.count()

160585

In [29]:
df_business.show(5)

+-------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|            address|          attributes|         business_id|          categories|       city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+-------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|       921 Pearl St|[,, 'beer_and_win...|6iYb2HFDywm3zjuRg...|Gastropubs, Food,...|    Boulder|[11:0-23:0, 11:0-...|      1|   40.0175444|   -105.2833481| Oskar Blues Taproom|      80302|          86|  4.0|   CO|
|7000 NE Airport Way|[,, u'beer_and_wi...|tCbdrRPZA0oiIYSmH...|Salad, Soup, Sand...|   Portland|[5:0-18:0, 5:0-18...|      1|45.5889058992|-122.

In [30]:
df_business.createOrReplaceTempView("df_business")

In [31]:
df_business_final = spark.sql('''
                                  SELECT business_id,
                                      latitude,
                                      longitude,
                                      postal_code,
                                      state,
                                      stars,
                                      review_count
                                  FROM df_business
                              ''')

In [32]:
df_business_final.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: long (nullable = true)



In [33]:
df_business_final.show(5)

+--------------------+-------------+---------------+-----------+-----+-----+------------+
|         business_id|     latitude|      longitude|postal_code|state|stars|review_count|
+--------------------+-------------+---------------+-----------+-----+-----+------------+
|6iYb2HFDywm3zjuRg...|   40.0175444|   -105.2833481|      80302|   CO|  4.0|          86|
|tCbdrRPZA0oiIYSmH...|45.5889058992|-122.5933307507|      97218|   OR|  4.0|         126|
|bvN78flM8NLprQ1a1...|45.5119069956|-122.6136928797|      97214|   OR|  4.5|          13|
|oaepsyvc0J17qwi8c...|   28.9144823|    -81.2959787|      32763|   FL|  3.0|           8|
|PE9uqAjdw0E4-8mjG...|   33.7470274|    -84.3534244|      30316|   GA|  4.0|          14|
+--------------------+-------------+---------------+-----------+-----+-----+------------+
only showing top 5 rows



In [34]:
# df_business_final.coalesce(1).write.csv("business_post_etl.csv", header=True)

### Review DataPrep

In [35]:
df_review = spark.read.json(data_location + filename_prefix + "review.json")

In [36]:
df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [37]:
df_review.count()

8635403

In [38]:
df_review.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|buF9druCkbuXLX526...|   1|2014-10-11 03:34:02|    1|lWC-xP3rd6obsecCY...|  4.0|Apparently Prides...|     3|ak0TdVmGKo4pwqdJS...|
|RA4V8pr014UyUbDvI...|   0|2015-07-03 20:38:25|    0|8bFej1QE5LXp4O05q...|  4.0|This store is pre...|     1|YoVfDbnISlW0f7abN...|
|_sS2LBIGNT5NQb6PD...|   0|2013-05-28 20:38:06|    0|NDhkzczKjLshODbqD...|  5.0|I called WVM on t...|     0|eC5evKn1TWDyHCyQA...|
|0AzLzHfOJgL7ROwhd...|   1|2010-01-08 02:29:15|    1|T5fAqjjFooT4V0OeZ...|  2.0|I've stayed at ma...|     1|SFQ1jcnGguO0LYWnb...|
|8zehGz9jnxPqXtOc7...|   0|2011-07-28 18:05:01|    0|sjm_uUcQVxab_EeLC...|  4.0|The food i

In [39]:
df_review.createOrReplaceTempView("df_review")

In [40]:
df_review_final = spark.sql('''
                                SELECT review_id,
                                    to_timestamp(date) AS review_date,
                                    user_id,
                                    business_id,
                                    stars,
                                    text,
                                    ufc_total AS ufc_count_target,
                                CASE
                                    WHEN ufc_total >= 1 THEN "True"
                                    ELSE "False"
                                END AS ufc_bool_target
                                FROM (SELECT review_id,
                                          date,
                                          user_id,
                                          business_id,
                                          stars,
                                          text,
                                          cool + funny + useful AS ufc_total
                                      FROM df_review)
                            ''')

In [41]:
df_review_final.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- ufc_count_target: long (nullable = true)
 |-- ufc_bool_target: string (nullable = false)



In [42]:
df_review_final.show(20)

+--------------------+-------------------+--------------------+--------------------+-----+--------------------+----------------+---------------+
|           review_id|        review_date|             user_id|         business_id|stars|                text|ufc_count_target|ufc_bool_target|
+--------------------+-------------------+--------------------+--------------------+-----+--------------------+----------------+---------------+
|lWC-xP3rd6obsecCY...|2014-10-11 03:34:02|ak0TdVmGKo4pwqdJS...|buF9druCkbuXLX526...|  4.0|Apparently Prides...|               5|           True|
|8bFej1QE5LXp4O05q...|2015-07-03 20:38:25|YoVfDbnISlW0f7abN...|RA4V8pr014UyUbDvI...|  4.0|This store is pre...|               1|           True|
|NDhkzczKjLshODbqD...|2013-05-28 20:38:06|eC5evKn1TWDyHCyQA...|_sS2LBIGNT5NQb6PD...|  5.0|I called WVM on t...|               0|          False|
|T5fAqjjFooT4V0OeZ...|2010-01-08 02:29:15|SFQ1jcnGguO0LYWnb...|0AzLzHfOJgL7ROwhd...|  2.0|I've stayed at ma...|               3|  

In [43]:
# df_review_final.coalesce(1).write.csv("review_post_etl.csv", header=True)

### Combining Data Tables

#### Create Temp Tables

In [44]:
df_checkin_final.createOrReplaceTempView("df_checkin_final")
df_user_final.createOrReplaceTempView("df_user_final")
df_business_final.createOrReplaceTempView("df_business_final")
df_review_final.createOrReplaceTempView("df_review_final")

In [45]:
all_data = spark.sql('''
                         SELECT r.review_id,
                             r.user_id,
                             r.business_id,
                             b.latitude AS biz_latitude,
                             b.longitude AS biz_longitude,
                             b.postal_code AS biz_postal_code,
                             b.state AS biz_state,
                             b.stars AS biz_avg_stars,
                             b.review_count AS biz_review_count,
                             c.num_checkins AS biz_checkin_count,
                             c.checkin_min AS biz_min_checkin_date,
                             c.checkin_max AS biz_max_checkin_date,
                             u.yelping_since AS user_yelping_since,
                             u.elite_count AS user_elite_count,
                             u.elite_min AS user_elite_min,
                             u.elite_max AS user_elite_max,
                             u.average_stars AS user_avg_stars,
                             u.review_count AS user_review_count,
                             u.fans AS user_fan_count,
                             u.friend_count AS user_friend_count,
                             u.compliments AS user_compliment_count,
                             u.ufc_count AS user_ufc_count,
                             r.review_date AS review_date,
                             r.stars AS review_stars,
                             r.text AS review_text,
                             r.ufc_count_target AS target_ufc_count,
                             r.ufc_bool_target AS target_ufc_bool
                         FROM df_review_final AS r
                         LEFT JOIN df_user_final AS u
                         ON r.user_id = u.user_id
                         LEFT JOIN df_business_final AS b
                         ON r.business_id = b.business_id
                         LEFT JOIN df_checkin_final AS c
                         ON r.business_id = c.business_id
                     ''')

In [46]:
all_data.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- biz_latitude: double (nullable = true)
 |-- biz_longitude: double (nullable = true)
 |-- biz_postal_code: string (nullable = true)
 |-- biz_state: string (nullable = true)
 |-- biz_avg_stars: double (nullable = true)
 |-- biz_review_count: long (nullable = true)
 |-- biz_checkin_count: integer (nullable = true)
 |-- biz_min_checkin_date: timestamp (nullable = true)
 |-- biz_max_checkin_date: timestamp (nullable = true)
 |-- user_yelping_since: timestamp (nullable = true)
 |-- user_elite_count: integer (nullable = true)
 |-- user_elite_min: integer (nullable = true)
 |-- user_elite_max: integer (nullable = true)
 |-- user_avg_stars: double (nullable = true)
 |-- user_review_count: long (nullable = true)
 |-- user_fan_count: long (nullable = true)
 |-- user_friend_count: integer (nullable = true)
 |-- user_compliment_count: long (nullable = true)
 |-- user_

In [47]:
all_data.count()

8635403

In [48]:
all_data.show(5)

+--------------------+--------------------+--------------------+------------+-------------+---------------+---------+-------------+----------------+-----------------+--------------------+--------------------+-------------------+----------------+--------------+--------------+--------------+-----------------+--------------+-----------------+---------------------+--------------+-------------------+------------+--------------------+----------------+---------------+
|           review_id|             user_id|         business_id|biz_latitude|biz_longitude|biz_postal_code|biz_state|biz_avg_stars|biz_review_count|biz_checkin_count|biz_min_checkin_date|biz_max_checkin_date| user_yelping_since|user_elite_count|user_elite_min|user_elite_max|user_avg_stars|user_review_count|user_fan_count|user_friend_count|user_compliment_count|user_ufc_count|        review_date|review_stars|         review_text|target_ufc_count|target_ufc_bool|
+--------------------+--------------------+--------------------+----

## Save All Data

In [49]:
# all_data.coalesce(1).write.csv("all_data.csv", header=True, sep=',')

In [50]:
# all_data.coalesce(1).write.json(path='all_data.json')

## Split Data

In [51]:
all_data.createOrReplaceTempView("all_data")

### Split Data Into Train, Test and Holdout Sets

In [52]:
working_data, holdout_data = all_data.randomSplit([0.8, 0.2], seed=12345)

In [53]:
working_data.count()

6906371

In [54]:
holdout_data.count()

1729032

In [55]:
train_data, test_data = working_data.randomSplit([0.8, 0.2], seed=12345)

In [56]:
train_data.count()

5523992

In [57]:
test_data.count()

1382379

### Split Data Into Text and Non-Text

In [58]:
train_data.createOrReplaceTempView("train_data")
test_data.createOrReplaceTempView("test_data")

In [59]:
text_data_train = spark.sql('''
                               SELECT review_id,
                                   review_stars,
                                   review_text,
                                   target_ufc_bool,
                                   target_ufc_count
                               FROM train_data
                            ''')

text_data_test = spark.sql('''
                              SELECT review_id,
                                  review_stars,
                                  review_text,
                                  target_ufc_bool,
                                  target_ufc_count
                              FROM test_data
                           ''')

In [60]:
text_data_train.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- review_text: string (nullable = true)
 |-- target_ufc_bool: string (nullable = false)
 |-- target_ufc_count: long (nullable = true)



In [61]:
text_data_test.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- review_text: string (nullable = true)
 |-- target_ufc_bool: string (nullable = false)
 |-- target_ufc_count: long (nullable = true)



In [62]:
print(f'Text Train Records: {text_data_train.count()}')
print(f'Text Test Records: {text_data_test.count()}')

Text Train Records: 5523992
Text Test Records: 1382379


In [63]:
text_data_train.show(5)

+--------------------+------------+--------------------+---------------+----------------+
|           review_id|review_stars|         review_text|target_ufc_bool|target_ufc_count|
+--------------------+------------+--------------------+---------------+----------------+
|---zlFD4Kgfatr0Sb...|         4.0|Been looking for ...|          False|               0|
|--BcxYRlOpG0v7nVQ...|         4.0|I visited Kyma la...|          False|               0|
|--KO46TSxWzv32x00...|         5.0|It might be the m...|          False|               0|
|--XNrIWxRUafMsGqz...|         5.0|Love this place! ...|           True|               1|
|--aGgQu9HVva6F9fB...|         4.0|Great salad and c...|          False|               0|
+--------------------+------------+--------------------+---------------+----------------+
only showing top 5 rows



In [64]:
text_data_test.show(5)

+--------------------+------------+--------------------+---------------+----------------+
|           review_id|review_stars|         review_text|target_ufc_bool|target_ufc_count|
+--------------------+------------+--------------------+---------------+----------------+
|--p3d1axlnA7ka_p6...|         5.0|Oh, we LOVE this ...|          False|               0|
|-1v3W4XqQcIe44_I1...|         5.0|I didn't smell an...|           True|               5|
|-21y2QEKfhjxh2alg...|         5.0|Wow this place is...|           True|               1|
|-358vecdAUh6ECkNf...|         5.0|For a crepe, you ...|          False|               0|
|-3_NmlYMibrapNEnS...|         5.0|I've visited Kyma...|           True|               1|
+--------------------+------------+--------------------+---------------+----------------+
only showing top 5 rows



In [65]:
non_text_data_train = spark.sql('''
                                   SELECT review_id,
                                       user_id,
                                       business_id,
                                       review_stars,
                                       review_date,
                                       biz_avg_stars,
                                       biz_review_count,
                                       biz_checkin_count,
                                       biz_max_checkin_date,
                                       biz_min_checkin_date,
                                       biz_latitude,
                                       biz_longitude,
                                       biz_postal_code,
                                       biz_state,
                                       user_avg_stars,
                                       user_review_count,
                                       user_friend_count,
                                       user_fan_count,
                                       user_compliment_count,
                                       user_elite_count,
                                       user_elite_max,
                                       user_elite_min,
                                       user_yelping_since,
                                       target_ufc_bool,
                                       target_ufc_count
                                   FROM train_data
                               ''')

non_text_data_test = spark.sql('''
                                  SELECT review_id,
                                      user_id,
                                      business_id,
                                      review_stars,
                                      review_date,
                                      biz_avg_stars,
                                      biz_review_count,
                                      biz_checkin_count,
                                      biz_max_checkin_date,
                                      biz_min_checkin_date,
                                      biz_latitude,
                                      biz_longitude,
                                      biz_postal_code,
                                      biz_state,
                                      user_avg_stars,
                                      user_review_count,
                                      user_friend_count,
                                      user_fan_count,
                                      user_compliment_count,
                                      user_elite_count,
                                      user_elite_max,
                                      user_elite_min,
                                      user_yelping_since,
                                      target_ufc_bool,
                                      target_ufc_count
                                  FROM test_data
                               ''')

In [66]:
non_text_data_train.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- biz_avg_stars: double (nullable = true)
 |-- biz_review_count: long (nullable = true)
 |-- biz_checkin_count: integer (nullable = true)
 |-- biz_max_checkin_date: timestamp (nullable = true)
 |-- biz_min_checkin_date: timestamp (nullable = true)
 |-- biz_latitude: double (nullable = true)
 |-- biz_longitude: double (nullable = true)
 |-- biz_postal_code: string (nullable = true)
 |-- biz_state: string (nullable = true)
 |-- user_avg_stars: double (nullable = true)
 |-- user_review_count: long (nullable = true)
 |-- user_friend_count: integer (nullable = true)
 |-- user_fan_count: long (nullable = true)
 |-- user_compliment_count: long (nullable = true)
 |-- user_elite_count: integer (nullable = true)
 |-- user_elite_max: integer (nullable = true)
 |-- user_elite_min:

In [67]:
non_text_data_test.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- biz_avg_stars: double (nullable = true)
 |-- biz_review_count: long (nullable = true)
 |-- biz_checkin_count: integer (nullable = true)
 |-- biz_max_checkin_date: timestamp (nullable = true)
 |-- biz_min_checkin_date: timestamp (nullable = true)
 |-- biz_latitude: double (nullable = true)
 |-- biz_longitude: double (nullable = true)
 |-- biz_postal_code: string (nullable = true)
 |-- biz_state: string (nullable = true)
 |-- user_avg_stars: double (nullable = true)
 |-- user_review_count: long (nullable = true)
 |-- user_friend_count: integer (nullable = true)
 |-- user_fan_count: long (nullable = true)
 |-- user_compliment_count: long (nullable = true)
 |-- user_elite_count: integer (nullable = true)
 |-- user_elite_max: integer (nullable = true)
 |-- user_elite_min:

In [68]:
non_text_data_train.show(5)

+--------------------+--------------------+--------------------+------------+-------------------+-------------+----------------+-----------------+--------------------+--------------------+------------+-------------+---------------+---------+--------------+-----------------+-----------------+--------------+---------------------+----------------+--------------+--------------+-------------------+---------------+----------------+
|           review_id|             user_id|         business_id|review_stars|        review_date|biz_avg_stars|biz_review_count|biz_checkin_count|biz_max_checkin_date|biz_min_checkin_date|biz_latitude|biz_longitude|biz_postal_code|biz_state|user_avg_stars|user_review_count|user_friend_count|user_fan_count|user_compliment_count|user_elite_count|user_elite_max|user_elite_min| user_yelping_since|target_ufc_bool|target_ufc_count|
+--------------------+--------------------+--------------------+------------+-------------------+-------------+----------------+------------

In [69]:
non_text_data_test.show(5)

+--------------------+--------------------+--------------------+------------+-------------------+-------------+----------------+-----------------+--------------------+--------------------+------------+-------------+---------------+---------+--------------+-----------------+-----------------+--------------+---------------------+----------------+--------------+--------------+-------------------+---------------+----------------+
|           review_id|             user_id|         business_id|review_stars|        review_date|biz_avg_stars|biz_review_count|biz_checkin_count|biz_max_checkin_date|biz_min_checkin_date|biz_latitude|biz_longitude|biz_postal_code|biz_state|user_avg_stars|user_review_count|user_friend_count|user_fan_count|user_compliment_count|user_elite_count|user_elite_max|user_elite_min| user_yelping_since|target_ufc_bool|target_ufc_count|
+--------------------+--------------------+--------------------+------------+-------------------+-------------+----------------+------------

In [70]:
print(f'Non-Text Train Records: {non_text_data_train.count()}')
print(f'Non-Text Test Records: {non_text_data_test.count()}')

Non-Text Train Records: 5523992
Non-Text Test Records: 1382379


## Save Split Data

### To File

In [71]:
# text_data_train.coalesce(1).write.json(path='text_data_train.json')
# text_data_test.coalesce(1).write.json(path='text_data_test.json')

In [72]:
# non_text_data_train.coalesce(1).write.csv(path='non_text_data_train.csv', header=True, sep=',')
# non_text_data_test.coalesce(1).write.csv(path='non_text_data_test.csv', header=True, sep=',')

In [73]:
# holdout_data.coalesce(1).write.json(path='holdout_data.json')

### To AWS RDS

In [74]:
db_endpoint = None
db_name = None
db_password = None

db_properties = {
    "user": "postgres",
    "password": db_password,
    "driver": "org.postgresql.Driver"
}

db_url = f'jdbc:postgresql://{db_endpoint}/{db_name}'

In [75]:
# text_data_train.write.jdbc(url=db_url,table='text_data_train',mode='overwrite',properties=db_properties)
# text_data_test.write.jdbc(url=db_url,table='text_data_test',mode='overwrite',properties=db_properties)

In [76]:
# non_text_data_train.write.jdbc(url=db_url,table='non_text_data_train',mode='overwrite',properties=db_properties)
# non_text_data_test.write.jdbc(url=db_url,table='non_text_data_test',mode='overwrite',properties=db_properties)

In [77]:
# holdout_data.write.jdbc(url=db_url,table='holdout_data',mode='overwrite',properties=db_properties)