# Environmental Setup

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(dtcdezcasparkp, 4, 4, Finished, Available)

In [3]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

StatementMeta(dtcdezcasparkp, 4, 5, Finished, Available)

In [None]:
tipSchema = StructType([
    StructField("text", StringType(), True),
    StructField("date", StringType(), True),
    StructField("compliment_count", IntegerType(), True),
    StructField("business_id", StringType(), True),
    StructField("user_id", StringType(), True)
])

checkinSchema = StructType([
    StructField("business_id", StringType(), True),
    StructField("date", StringType(), True)
])

userSchema = StructType([
    StructField("user_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("yelping_since", StringType(), True),  
    StructField("friends", StringType(), True),
    StructField("useful", IntegerType(), True),
    StructField("funny", IntegerType(), True),
    StructField("cool", IntegerType(), True),
    StructField("fans", IntegerType(), True),
    StructField("elite", StringType(), True),
    StructField("average_stars", FloatType(), True),
    StructField("compliment_hot", IntegerType(), True),
    StructField("compliment_more", IntegerType(), True),
    StructField("compliment_profile", IntegerType(), True),
    StructField("compliment_cute", IntegerType(), True),
    StructField("compliment_list", IntegerType(), True),
    StructField("compliment_note", IntegerType(), True),
    StructField("compliment_plain", IntegerType(), True),
    StructField("compliment_cool", IntegerType(), True),
    StructField("compliment_funny", IntegerType(), True),
    StructField("compliment_writer", IntegerType(), True),
    StructField("compliment_photos", IntegerType(), True)
])

business_parking_schema = StructType([
    StructField("garage", BooleanType(), True),
    StructField("street", BooleanType(), True),
    StructField("validated", BooleanType(), True),
    StructField("lot", BooleanType(), True),
    StructField("valet", BooleanType(), True),
])

attributes_schema = StructType([
    StructField("RestaurantsTakeOut", BooleanType(), True),
    StructField("BusinessParking", business_parking_schema, True),
])

businessSchema = StructType([
    StructField("business_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("stars", FloatType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", attributes_schema, True),
    StructField("categories", StringType(), True),
    StructField("hours", MapType(StringType(), StringType()), True)
])


In [4]:
tip_df = spark.read.schema(tipSchema).option("multiLine", "false").load('abfss://dtcdezca-adls@dtcdezcasa.dfs.core.windows.net/yelp_academic_dataset_tip.json', format='json')
tip_df = tip_df.withColumn("date", to_date("date", "yyyy-MM-dd"))

StatementMeta(dtcdezcasparkp, 4, 6, Finished, Available)

In [5]:
checkin_df = spark.read.schema(checkinSchema).option("multiLine", "false").load('abfss://dtcdezca-adls@dtcdezcasa.dfs.core.windows.net/yelp_academic_dataset_checkin.json', format='json')

checkin_df = checkin_df.withColumn("date", explode(split("date", ", ")))
checkin_df = checkin_df.withColumn("date", to_timestamp("date", "yyyy-MM-dd HH:mm:ss"))

StatementMeta(dtcdezcasparkp, 4, 7, Finished, Available)

In [6]:
tip_df.printSchema()

StatementMeta(dtcdezcasparkp, 4, 8, Finished, Available)

root
 |-- text: string (nullable = true)
 |-- date: date (nullable = true)
 |-- compliment_count: integer (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)



In [7]:
checkin_df.printSchema()

StatementMeta(dtcdezcasparkp, 4, 9, Finished, Available)

root
 |-- business_id: string (nullable = true)
 |-- date: timestamp (nullable = true)



In [8]:
tip_df.sample(0.01, 3).show()

StatementMeta(dtcdezcasparkp, 4, 10, Finished, Available)

+--------------------+----------+----------------+--------------------+--------------------+
|                text|      date|compliment_count|         business_id|             user_id|
+--------------------+----------+----------------+--------------------+--------------------+
|Great breakfast b...|2016-08-01|               0|vaYy-IWyfh1ILji0l...|e67fm29pOiQUcPSlt...|
|Pizza with the He...|2012-03-18|               0|95d60_mjUg2kLy-bh...|5a7o8Z_jc1pabjavU...|
|Don't miss it, it...|2013-05-31|               0|jiGj-UE_2I2-An5IF...|6s5whJLSvZFIF0Am4...|
|The bakery items ...|2016-01-29|               0|YcP6XiXITpQY-RvL-...|fr1Hz2acAb3OaL3l6...|
|If you're looking...|2015-02-16|               0|8yR12PNSMo6FBYx1u...|4kVVUp5yxMohjW0s_...|
|Skip the soft dri...|2011-04-17|               0|AaTpjyw-EiODgi3tR...|vBFtIlBirAvgbTrxE...|
|Love the green cu...|2014-04-21|               0|vUkiYPpbkMXA99Wne...|QCrW0zYvHcWjtN0Wq...|
|Mmmmmm....... Del...|2011-04-09|               0|znEBYAqf13IBMNLO6...

In [9]:
checkin_df.sample(0.01, 3).show()

StatementMeta(dtcdezcasparkp, 4, 11, Finished, Available)

+--------------------+-------------------+
|         business_id|               date|
+--------------------+-------------------+
|--7PUidqRWpRSpXeb...|2014-05-02 15:49:55|
|--7jw19RH9JKXgFoh...|2017-05-21 16:07:21|
|--ARBQr1WMsTWiwOK...|2016-12-21 23:18:43|
|--MbOh2O1pATkXa7x...|2013-11-10 02:40:38|
|--O3ip9NpXTKD4oBS...|2010-11-21 21:09:44|
|--O3ip9NpXTKD4oBS...|2015-06-25 23:41:57|
|--O3ip9NpXTKD4oBS...|2016-05-22 20:03:08|
|--S43ruInmIsGrnnk...|2011-07-01 00:34:42|
|--S43ruInmIsGrnnk...|2011-11-25 15:41:20|
|--S43ruInmIsGrnnk...|2012-02-28 00:36:37|
|--S43ruInmIsGrnnk...|2014-02-20 23:21:32|
|--S43ruInmIsGrnnk...|2014-09-06 17:39:26|
|--S43ruInmIsGrnnk...|2016-08-20 19:45:39|
|--ZVrH2X2QXBFdCil...|2012-12-21 01:30:43|
|--_9CAxgfXZmoFdNI...|2015-10-16 20:02:01|
|--_9CAxgfXZmoFdNI...|2018-04-21 17:05:41|
|--epgcb7xHGuJ-4PU...|2013-04-07 14:35:01|
|--epgcb7xHGuJ-4PU...|2018-05-06 16:27:26|
|--gJkxbsiSIwsQKbi...|2019-01-25 16:59:40|
|--onnLZrsCazmcy2P...|2011-02-01 21:44:51|
+----------

In [None]:
tip_df = tip_df.withColumn("year", year("date"))
tip_df = tip_df.withColumn("month", month("date"))
tip_df = tip_df.withColumn("day", dayofmonth("date"))
tip_df.write.partitionBy("year", "month").format("parquet").save("/tip/")

In [None]:
checkin_df = checkin_df.withColumn("year", year("date"))
checkin_df = checkin_df.withColumn("month", month("date"))
checkin_df = checkin_df.withColumn("day", dayofmonth("date"))

checkin_df.write.partitionBy("year", "month").format("parquet").save("/checkin/")

StatementMeta(, , , Cancelled, )

In [14]:
review_df = spark.read.synapsesql("dtcdezcasqlpool.dbo.ExternalReviewTable")

StatementMeta(dtcdezcasparkp, 4, 16, Finished, Available)

In [15]:
review_df.printSchema()

StatementMeta(dtcdezcasparkp, 4, 17, Finished, Available)

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- id: string (nullable = true)



In [96]:
business_df = spark.read.option("multiLine", "false").load("abfss://dtcdezca-adls@dtcdezcasa.dfs.core.windows.net/yelp_academic_dataset_business.json", format="json",  schema=businessSchema)

StatementMeta(dtcdezcasparkp, 4, 98, Finished, Available)

In [53]:
user_df = spark.read.option("multiLine", "false").load("abfss://dtcdezca-adls@dtcdezcasa.dfs.core.windows.net/yelp_academic_dataset_user.json", format="json", schema=userSchema)

StatementMeta(dtcdezcasparkp, 4, 55, Finished, Available)

In [54]:
user_df.printSchema()

StatementMeta(dtcdezcasparkp, 4, 56, Finished, Available)

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- yelping_since: string (nullable = true)
 |-- friends: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- fans: integer (nullable = true)
 |-- elite: string (nullable = true)
 |-- average_stars: float (nullable = true)
 |-- compliment_hot: integer (nullable = true)
 |-- compliment_more: integer (nullable = true)
 |-- compliment_profile: integer (nullable = true)
 |-- compliment_cute: integer (nullable = true)
 |-- compliment_list: integer (nullable = true)
 |-- compliment_note: integer (nullable = true)
 |-- compliment_plain: integer (nullable = true)
 |-- compliment_cool: integer (nullable = true)
 |-- compliment_funny: integer (nullable = true)
 |-- compliment_writer: integer (nullable = true)
 |-- compliment_photos: integer (nullable = true)



In [55]:
user_df.sample(0.01,3).show()

StatementMeta(dtcdezcasparkp, 4, 57, Finished, Available)

+--------------------+--------------------+------------+-------------------+--------------------+------+-----+-----+----+--------------------+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|             user_id|                name|review_count|      yelping_since|             friends|useful|funny| cool|fans|               elite|average_stars|compliment_hot|compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool|compliment_funny|compliment_writer|compliment_photos|
+--------------------+--------------------+------------+-------------------+--------------------+------+-----+-----+----+--------------------+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+--------------

# Feature Analysis

In [64]:
new_users = user_df.withColumn("year", year("yelping_since")).groupBy("year").count().withColumnRenamed("count", "new_users")

reviews = review_df.withColumn("year", year("date")).groupBy("year").count().withColumnRenamed("count", "reviews")

elite_expanded = user_df.withColumn("elite_array", split("elite", ",")) \
                        .select("user_id", explode("elite_array").alias("elite_year"))
elite_users_by_year = elite_expanded.groupBy("elite_year").count().withColumnRenamed("count", "elite_users").withColumnRenamed("elite_year", "year")

tips = tip_df.withColumn("year", year("date")).groupBy("year").count().withColumnRenamed("count", "tips")
checkins = checkin_df.withColumn("year", year("date")).groupBy("year").count().withColumnRenamed("count", "checkins")

aggregate_df = new_users.join(reviews, "year", "outer") \
                        .join(elite_users_by_year, "year", "outer") \
                        .join(tips, "year", "outer") \
                        .join(checkins, "year", "outer")

aggregate_df

StatementMeta(dtcdezcasparkp, 4, 66, Finished, Available)

DataFrame[year: string, new_users: bigint, reviews: bigint, elite_users: bigint, tips: bigint, checkins: bigint]

In [65]:
aggregate_df.show()

StatementMeta(dtcdezcasparkp, 4, 67, Finished, Available)

+----+---------+-------+-----------+------+--------+
|year|new_users|reviews|elite_users|  tips|checkins|
+----+---------+-------+-----------+------+--------+
|    |     null|   null|    1896699|  null|    null|
|  20|     null|   null|      79858|  null|    null|
|2004|       90|   null|       null|  null|    null|
|2005|      937|    552|       null|  null|    null|
|2006|     5423|   2289|        775|  null|    null|
|2007|    15340|   9545|       2023|  null|    null|
|2008|    31097|  30262|       3185|  null|    null|
|2009|    64911|  46888|       5479|   665|       2|
|2010|   109054|  87316|       8772| 26712|  209153|
|2011|   176435| 144931|      10997| 83395|  901457|
|2012|   195955| 179992|      15222|110459| 1289492|
|2013|   209762| 240319|      16193|107563| 1552799|
|2014|   233465| 323855|      18571|109160| 1625884|
|2015|   247850| 426403|      24175| 89686| 1709857|
|2016|   217620| 473753|      29636| 94333| 1554774|
|2017|   151024| 512153|      36015| 93909| 13

Problem occured with year '20' appeared. Needed investigation.

In [None]:
aggregate_df.write.mode("overwrite").saveAsTable("yelp_feature_table")

StatementMeta(, , , Cancelled, )

In [67]:
elite_expanded.select("elite_year").where(col("elite_year") == 20).show()

StatementMeta(dtcdezcasparkp, 4, 69, Finished, Available)

+----------+
|elite_year|
+----------+
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
|        20|
+----------+
only showing top 20 rows



In [74]:
rows_with_20 = user_df.filter(col("elite").contains(",20,"))
rows_with_20.select("elite").show(truncate=False)

StatementMeta(dtcdezcasparkp, 4, 76, Finished, Available)

+--------------------------------------------------------------------------------+
|elite                                                                           |
+--------------------------------------------------------------------------------+
|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021               |
|2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021                    |
|2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021                    |
|2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021|
|2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021                    |
|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021               |
|2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021                         |
|2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021                    |
|2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,20,20,2021|
|201

In [75]:
user_df = user_df.withColumn("elite", regexp_replace("elite", "20,20", "2020"))
user_df.select("elite").show(30, truncate=False)

StatementMeta(dtcdezcasparkp, 4, 77, Finished, Available)

+----------------------------------------------------------------+
|elite                                                           |
+----------------------------------------------------------------+
|2007                                                            |
|2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021|
|2009,2010,2011,2012,2013                                        |
|2009,2010,2011                                                  |
|                                                                |
|2006,2007,2008,2009,2010,2011,2012,2013,2014                    |
|                                                                |
|                                                                |
|                                                                |
|                                                                |
|                                                                |
|2009,2010,2011,2012                                          

In [92]:
from pyspark.sql.functions import regexp_extract

elite_expanded = filtered_elite.select("user_id", explode(col("elite_array")).alias("elite_year"))
elite_expanded = elite_expanded.withColumn("elite_year", regexp_extract(col("elite_year"), "^\\d{4}$", 0))

elite_expanded = elite_expanded.filter(elite_expanded.elite_year != "")

elite_users_by_year = elite_expanded.groupBy("elite_year").count().withColumnRenamed("count", "elite_users").withColumnRenamed("elite_year", "year")

aggregate_df = new_users.join(reviews, "year", "outer") \
                        .join(elite_users_by_year, "year", "outer") \
                        .join(tips, "year", "outer") \
                        .join(checkins, "year", "outer")

aggregate_df.filter(col("year").isNotNull()).show()


StatementMeta(dtcdezcasparkp, 4, 94, Finished, Available)

+----+---------+-------+-----------+------+--------+
|year|new_users|reviews|elite_users|  tips|checkins|
+----+---------+-------+-----------+------+--------+
|2004|       90|   null|       null|  null|    null|
|2005|      937|    552|       null|  null|    null|
|2006|     5423|   2289|        775|  null|    null|
|2007|    15340|   9545|       2023|  null|    null|
|2008|    31097|  30262|       3185|  null|    null|
|2009|    64911|  46888|       5479|   665|       2|
|2010|   109054|  87316|       8772| 26712|  209153|
|2011|   176435| 144931|      10997| 83395|  901457|
|2012|   195955| 179992|      15222|110459| 1289492|
|2013|   209762| 240319|      16193|107563| 1552799|
|2014|   233465| 323855|      18571|109160| 1625884|
|2015|   247850| 426403|      24175| 89686| 1709857|
|2016|   217620| 473753|      29636| 94333| 1554774|
|2017|   151024| 512153|      36015| 93909| 1348466|
|2018|   133568| 579055|      41009| 67033| 1157252|
|2019|   104655| 603244|      44044| 57646| 10

# Output to Datawarehouse

In [100]:
container_path = 'abfss://dtcdezca-adls@dtcdezcasa.dfs.core.windows.net/'

StatementMeta(dtcdezcasparkp, 4, 102, Finished, Available)

In [93]:
output_file= 'yelp_feature.csv'
aggregate_df.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(container_path + output_file)

StatementMeta(dtcdezcasparkp, 4, 95, Finished, Available)

In [98]:
categories_df = business_df.withColumn("category", explode(split(col("categories"), ", ")))
category_distribution = categories_df.groupBy("category").count().orderBy("count", ascending=False)

category_distribution.show(truncate=False)

StatementMeta(dtcdezcasparkp, 4, 100, Finished, Available)

+-------------------------+-----+
|category                 |count|
+-------------------------+-----+
|Restaurants              |52268|
|Food                     |27781|
|Shopping                 |24395|
|Home Services            |14356|
|Beauty & Spas            |14292|
|Nightlife                |12281|
|Health & Medical         |11890|
|Local Services           |11198|
|Bars                     |11065|
|Automotive               |10773|
|Event Planning & Services|9895 |
|Sandwiches               |8366 |
|American (Traditional)   |8139 |
|Active Life              |7687 |
|Pizza                    |7093 |
|Coffee & Tea             |6703 |
|Fast Food                |6472 |
|Breakfast & Brunch       |6239 |
|American (New)           |6097 |
|Hotels & Travel          |5857 |
+-------------------------+-----+
only showing top 20 rows



In [101]:
output_file= 'category_distribution.csv'

category_distribution.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(container_path + output_file)

StatementMeta(dtcdezcasparkp, 4, 103, Finished, Available)

In [102]:
user_df = user_df.withColumn("yelping_since", to_date("yelping_since", "yyyy-MM-dd"))
user_df = user_df.withColumn("year", year("yelping_since"))
user_df = user_df.withColumn("month", month("yelping_since"))

new_users_per_month = user_df.groupBy("year", "month").count().orderBy("year", "month")

new_users_per_month.show()

StatementMeta(dtcdezcasparkp, 4, 104, Finished, Available)

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2004|   10|   53|
|2004|   11|   14|
|2004|   12|   23|
|2005|    1|   24|
|2005|    2|   29|
|2005|    3|   51|
|2005|    4|   72|
|2005|    5|   69|
|2005|    6|   64|
|2005|    7|   97|
|2005|    8|   95|
|2005|    9|   71|
|2005|   10|   75|
|2005|   11|  113|
|2005|   12|  177|
|2006|    1|  269|
|2006|    2|  235|
|2006|    3|  329|
|2006|    4|  318|
|2006|    5|  275|
+----+-----+-----+
only showing top 20 rows



In [103]:
user_df = user_df.withColumn("yelping_since", to_date("yelping_since", "yyyy-MM-dd"))
user_df = user_df.withColumn("year", year("yelping_since"))
user_df = user_df.withColumn("quarter", quarter("yelping_since"))

new_users_per_quarter = user_df.groupBy("year", "quarter").count().orderBy("year", "quarter")

new_users_per_quarter.show()

StatementMeta(dtcdezcasparkp, 4, 105, Finished, Available)

+----+-------+-----+
|year|quarter|count|
+----+-------+-----+
|2004|      4|   90|
|2005|      1|  104|
|2005|      2|  205|
|2005|      3|  263|
|2005|      4|  365|
|2006|      1|  833|
|2006|      2|  947|
|2006|      3| 1785|
|2006|      4| 1858|
|2007|      1| 2916|
|2007|      2| 3631|
|2007|      3| 4619|
|2007|      4| 4174|
|2008|      1| 5913|
|2008|      2| 6826|
|2008|      3| 9356|
|2008|      4| 9002|
|2009|      1|12952|
|2009|      2|16127|
|2009|      3|19555|
+----+-------+-----+
only showing top 20 rows



In [104]:
output_file= 'new_users_per_quarter.csv'

new_users_per_quarter.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(container_path + output_file)

StatementMeta(dtcdezcasparkp, 4, 106, Finished, Available)