In [1]:
#Before using spark, need to install and set up spark, Hadoop and Jdk locally on your laptop


from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("etl") \
    .config("spark.master", "local") \
    .config("spark.jars", "F:/UniPrj/Big Data Project/postgresql-42.2.16.jar") \
    .getOrCreate()

#Load Amazon Data into Spark DataFrame
#1 load product segment - Furniture Segment --> download dataset tsv file
file_path = "F:/UniPrj/Big Data Project/Datasets/amazon_reviews_us_Furniture_v1_00.tsv"
df = spark.read.option("encoding", "UTF-8").csv(file_path, sep="\t", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...| 2015-08-31|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...|       Furniture|          5|    

In [2]:
#Create DataFrame - Perform Preliminary Cleaning

#1 Check the schema, print row & column count
df.printSchema()
print((df.count(), len(df.columns)))

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)

(792113, 15)


In [3]:
#2 Drop the round 1 columns
columns_to_drop = ['marketplace', 'product_parent', 'vine', 'review_headline', 'review_body', 'review_date']
df_dropped = df.drop(*columns_to_drop)
df_dropped.show()

+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+-----------------+
|customer_id|     review_id|product_id|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|
+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+-----------------+
|   24509695|R3VR960AHLFKDV|B004HB5E0E|Shoal Creek Compu...|       Furniture|          4|            0|          0|                Y|
|   34731776|R16LGVMFKIUT0G|B0042TNMMS|Dorel Home Produc...|       Furniture|          5|            0|          0|                Y|
|    1272331|R1AIMEEPYHMOE4|B0030MPBZ4|Bathroom Vanity T...|       Furniture|          5|            1|          1|                Y|
|   45284262|R1892CCSZWZ9SR|B005G02ESA|Sleep Master Ulti...|       Furniture|          3|            0|          0|                Y|
|   30003523|R285P679YWVKD1|B005JS8AUA|1 1/4" GashGuards...|  

In [4]:
#3 Filter Step 1:
df_filtered = df_dropped.filter(df_dropped.verified_purchase == 'Y')
df_filtered.show
print((df_filtered.count(), len(df_filtered.columns)))

(718192, 9)


In [5]:
#4 drop filtered verified_purchase column
columns_to_drop = ['verified_purchase']
df_dropped_2 = df_filtered.drop(*columns_to_drop)
df_dropped_2.show()

+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+
|customer_id|     review_id|product_id|       product_title|product_category|star_rating|helpful_votes|total_votes|
+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+
|   24509695|R3VR960AHLFKDV|B004HB5E0E|Shoal Creek Compu...|       Furniture|          4|            0|          0|
|   34731776|R16LGVMFKIUT0G|B0042TNMMS|Dorel Home Produc...|       Furniture|          5|            0|          0|
|    1272331|R1AIMEEPYHMOE4|B0030MPBZ4|Bathroom Vanity T...|       Furniture|          5|            1|          1|
|   45284262|R1892CCSZWZ9SR|B005G02ESA|Sleep Master Ulti...|       Furniture|          3|            0|          0|
|   18311821| RLB33HJBXHZHU|B00AVUQQGQ|Serta Bonded Leat...|       Furniture|          5|            0|          0|
|   42943632|R1VGTZ94DBAD6A|B00CFY20GQ|Prepac Shoe Stora...|       Furni

In [6]:
#Create Analysis-Specific DFs/Tables
#Perform Analysis-Specific Transforms
#Segmentation Analysis DF

# 1 Create Segmentation DF by Droppig Addtional Columns
segmentation_cols_drop = ['review_id', 'product_id', 'product_title', 'star_rating', 'helpful_votes', 'total_votes']
segmentation_dropped_df = df_dropped_2.drop(*segmentation_cols_drop)
segmentation_dropped_df.show()

+-----------+----------------+
|customer_id|product_category|
+-----------+----------------+
|   24509695|       Furniture|
|   34731776|       Furniture|
|    1272331|       Furniture|
|   45284262|       Furniture|
|   18311821|       Furniture|
|   42943632|       Furniture|
|   43157304|       Furniture|
|   51918480|       Furniture|
|   14522766|       Furniture|
|   43054112|       Furniture|
|   26622950|       Furniture|
|   17988940|       Furniture|
|   18444952|       Furniture|
|   16937084|       Furniture|
|   23665632|       Furniture|
|    4110125|       Furniture|
|     107621|       Furniture|
|    2415090|       Furniture|
|   48285966|       Furniture|
|   33228559|       Furniture|
+-----------+----------------+
only showing top 20 rows



In [7]:
#Note: must change Category Label name in withColumnRenamed('count(product_category)', 'name')

#2 Segmentation GroupBy
#2a GroupBy customer_id
#2b Count product_category and rename count columns as Segment Name
segment_df = segmentation_dropped_df.groupby("customer_id")\
.agg({'product_category':'count'}).withColumnRenamed('count(product_category)', 'furniture')
#2c Check results
segment_df.show()

+-----------+---------+
|customer_id|furniture|
+-----------+---------+
|   17067926|        2|
|   10714827|        1|
|   42560427|        1|
|   30717305|        1|
|    1178966|        1|
|   10429047|        1|
|   52541790|        2|
|   52512151|        1|
|   37534120|        1|
|   22555935|        1|
|   18681995|        1|
|    2119235|        2|
|   21846356|        1|
|   42251639|        1|
|    7730812|        1|
|   37666248|        1|
|   43676452|        1|
|   41466760|        1|
|   30403003|        1|
|   44524374|        1|
+-----------+---------+
only showing top 20 rows



In [8]:
#3 Check segment_df Schema and Row Count
segment_df.printSchema()
print(segment_df.count())

root
 |-- customer_id: integer (nullable = true)
 |-- furniture: long (nullable = false)

600425


In [9]:
#Note: Column Name in df.sort('name' ...) must align with Column name from step 2
#4 Filter for Top n Results
#4a Declare number of rows to filter by (100,000)
row_count = 100000
#4a Sort by Segment Desc and limit to row_count
filtered_segment_df = segment_df.sort("furniture", ascending=False).limit(row_count)
#4b Check Results
filtered_segment_df.show()
print(filtered_segment_df.count())    

+-----------+---------+
|customer_id|furniture|
+-----------+---------+
|   45212655|       33|
|   35178127|       27|
|   20845991|       25|
|   36020793|       25|
|   12609448|       24|
|   40418760|       22|
|   13278937|       22|
|   11643260|       19|
|   36700743|       18|
|    5669343|       17|
|   11159931|       17|
|   51201731|       17|
|   35095279|       17|
|   51672584|       17|
|   43450674|       16|
|   51032921|       16|
|   37870254|       16|
|   44471976|       16|
|   37864438|       15|
|   15153767|       15|
+-----------+---------+
only showing top 20 rows

100000


In [10]:
#Segmentation ETL Complete - Add Database Export Code

# Configure settings for RDS
mode = "append"
jdbc_url = 'jdbc:postgresql://localhost:5432/AmazonProducts'
config = {"user":'postgres', 
          "password": 'root', 
          "driver":"org.postgresql.Driver"}


In [11]:
#Note: table name in table='name_segment' must align with table name in Postgres

# Write segment table to Postgres/RDS
# 1 min 5 secs
filtered_segment_df.write.jdbc(url=jdbc_url, table='furniture_segment', mode=mode, properties=config)
     

In [12]:
# Run Queries in Postgres to Confirm Segment ETL
# Check Row Count of Segment Table - SELECT COUNT(*) FROM furniture_segment;
# Check 10 Rows of Segment Table - SELECT * FROM furniture_segment LIMIT(10);
print('Segment ETL Successful')

Segment ETL Successful
