<a href="https://colab.research.google.com/github/jbenasuli/final_project/blob/database/dev/database/Amazon_Vine-PySpark-ETLs/%20PRELIM_ETL_furniture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.1.1'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [53.9 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-05-02 23:40:55--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-05-02 23:40:57 (1.21 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
import datetime
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, BooleanType, DateType
from pyspark import SparkFiles

### Load Amazon Data into Spark DataFrame

Note: Enter URL for Desired Dataset

In [5]:
#1 load product segment
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...| 2015-08-31|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...|       Furniture|          5|    

### Create DataFrame - Perform Preliminary Cleaning

In [6]:
#1 Check the schema, print row & column count
df.printSchema()
print((df.count(), len(df.columns)))

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)

(792113, 15)


In [7]:
#2 Drop the round 1 columns
columns_to_drop = ['marketplace', 'product_parent', 'vine', 'review_headline', 'review_body', 'review_date']
df_dropped = df.drop(*columns_to_drop)
df_dropped.show()

+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+-----------------+
|customer_id|     review_id|product_id|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|
+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+-----------------+
|   24509695|R3VR960AHLFKDV|B004HB5E0E|Shoal Creek Compu...|       Furniture|          4|            0|          0|                Y|
|   34731776|R16LGVMFKIUT0G|B0042TNMMS|Dorel Home Produc...|       Furniture|          5|            0|          0|                Y|
|    1272331|R1AIMEEPYHMOE4|B0030MPBZ4|Bathroom Vanity T...|       Furniture|          5|            1|          1|                Y|
|   45284262|R1892CCSZWZ9SR|B005G02ESA|Sleep Master Ulti...|       Furniture|          3|            0|          0|                Y|
|   30003523|R285P679YWVKD1|B005JS8AUA|1 1/4" GashGuards...|  

In [8]:
#3 Filter Step 1:
df_filtered = df_dropped.filter(df_dropped.verified_purchase == 'Y')
df_filtered.show
print((df_filtered.count(), len(df_filtered.columns)))

(718192, 9)


In [9]:
#4 drop filtered verified_purchase column
columns_to_drop = ['verified_purchase']
df_dropped_2 = df_filtered.drop(*columns_to_drop)
df_dropped_2.show()

+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+
|customer_id|     review_id|product_id|       product_title|product_category|star_rating|helpful_votes|total_votes|
+-----------+--------------+----------+--------------------+----------------+-----------+-------------+-----------+
|   24509695|R3VR960AHLFKDV|B004HB5E0E|Shoal Creek Compu...|       Furniture|          4|            0|          0|
|   34731776|R16LGVMFKIUT0G|B0042TNMMS|Dorel Home Produc...|       Furniture|          5|            0|          0|
|    1272331|R1AIMEEPYHMOE4|B0030MPBZ4|Bathroom Vanity T...|       Furniture|          5|            1|          1|
|   45284262|R1892CCSZWZ9SR|B005G02ESA|Sleep Master Ulti...|       Furniture|          3|            0|          0|
|   18311821| RLB33HJBXHZHU|B00AVUQQGQ|Serta Bonded Leat...|       Furniture|          5|            0|          0|
|   42943632|R1VGTZ94DBAD6A|B00CFY20GQ|Prepac Shoe Stora...|       Furni

## Create Analysis-Specific DFs/Tables
### Perform Analysis-Specific Transforms

### Segmentation Analysis DF

In [10]:
# 1 Create Segmentation DF by Droppig Addtional Columns
segmentation_cols_drop = ['review_id', 'product_id', 'product_title', 'star_rating', 'helpful_votes', 'total_votes']
segmentation_dropped_df = df_dropped_2.drop(*segmentation_cols_drop)
segmentation_dropped_df.show()

+-----------+----------------+
|customer_id|product_category|
+-----------+----------------+
|   24509695|       Furniture|
|   34731776|       Furniture|
|    1272331|       Furniture|
|   45284262|       Furniture|
|   18311821|       Furniture|
|   42943632|       Furniture|
|   43157304|       Furniture|
|   51918480|       Furniture|
|   14522766|       Furniture|
|   43054112|       Furniture|
|   26622950|       Furniture|
|   17988940|       Furniture|
|   18444952|       Furniture|
|   16937084|       Furniture|
|   23665632|       Furniture|
|    4110125|       Furniture|
|     107621|       Furniture|
|    2415090|       Furniture|
|   48285966|       Furniture|
|   33228559|       Furniture|
+-----------+----------------+
only showing top 20 rows



Note: must change Category Label name in withColumnRenamed('count(product_category)', 'name')

In [11]:
#2 Segmentation GroupBy
#2a GroupBy customer_id
#2b Count product_category and rename count columns as Segment Name
segment_df = segmentation_dropped_df.groupby("customer_id")\
.agg({'product_category':'count'}).withColumnRenamed('count(product_category)', 'furniture')
#2c Check results
segment_df.show()

+-----------+---------+
|customer_id|furniture|
+-----------+---------+
|   17067926|        2|
|   10714827|        1|
|   42560427|        1|
|   30717305|        1|
|    1178966|        1|
|   10429047|        1|
|   52541790|        2|
|   52512151|        1|
|   37534120|        1|
|   22555935|        1|
|   18681995|        1|
|    2119235|        2|
|   21846356|        1|
|   42251639|        1|
|    7730812|        1|
|   37666248|        1|
|   43676452|        1|
|   41466760|        1|
|   30403003|        1|
|   44524374|        1|
+-----------+---------+
only showing top 20 rows



In [12]:
#3 Check segment_df Schema and Row Count
segment_df.printSchema()
print(segment_df.count())

root
 |-- customer_id: integer (nullable = true)
 |-- furniture: long (nullable = false)

600425


Note: Column Name in df.sort('name' ...) must align with Column name from step 2

In [13]:
#4 Filter for Top n Results
#4a Declare number of rows to filter by (100,000)
row_count = 100000
#4a Sort by Segment Desc and limit to row_count
filtered_segment_df = segment_df.sort("furniture", ascending=False).limit(row_count)
#4b Check Results
filtered_segment_df.show()
print(filtered_segment_df.count())

+-----------+---------+
|customer_id|furniture|
+-----------+---------+
|   45212655|       33|
|   35178127|       27|
|   20845991|       25|
|   36020793|       25|
|   12609448|       24|
|   40418760|       22|
|   13278937|       22|
|   11643260|       19|
|   36700743|       18|
|    5669343|       17|
|   11159931|       17|
|   51201731|       17|
|   35095279|       17|
|   51672584|       17|
|   37870254|       16|
|   44471976|       16|
|   43450674|       16|
|   51032921|       16|
|   15153767|       15|
|   50114748|       15|
+-----------+---------+
only showing top 20 rows

100000


### Segmentation ETL Complete - Add Database Export Code

In [14]:
# Configure settings for RDS
mode = "append"
jdbc_url='jdbc:postgresql://<connection_srtring>'
config = {"user":'<username>', 
          "password": '<password>', 
          "driver":"org.postgresql.Driver"}

Note: table name in table='name_segment' must align with table name in Postgres

In [15]:
# Write segment table to Postgres/RDS
# xx mins
filtered_segment_df.write.jdbc(url=jdbc_url, table='furniture_segment', mode=mode, properties=config)

## Apriori Analysis DF
### Enter Apriori Transfomations Below - Use df_dropped_2 as Start Point

In [16]:
# 1 Create Apriori DF by First Dropping Addtional Columns
apriori_cols_drop = ['product_category', 'product_title', 'star_rating', 'helpful_votes', 'total_votes']
apriori_dropped_df = df_dropped_2.drop(*apriori_cols_drop)
apriori_dropped_df.show()

+-----------+--------------+----------+
|customer_id|     review_id|product_id|
+-----------+--------------+----------+
|   24509695|R3VR960AHLFKDV|B004HB5E0E|
|   34731776|R16LGVMFKIUT0G|B0042TNMMS|
|    1272331|R1AIMEEPYHMOE4|B0030MPBZ4|
|   45284262|R1892CCSZWZ9SR|B005G02ESA|
|   18311821| RLB33HJBXHZHU|B00AVUQQGQ|
|   42943632|R1VGTZ94DBAD6A|B00CFY20GQ|
|   43157304|R168KF82ICSOHD|B00FKC48QA|
|   51918480|R20DIYIJ0OCMOG|B00N9IAL9K|
|   14522766| RD46RNVOHNZSC|B001T4XU1C|
|   43054112|R2JDOCETTM3AXS|B002HRFLBC|
|   26622950|R33YMW36IDZ6LE|B006MISZOC|
|   17988940|R30ZGGUHZ04C1S|B008BMGABC|
|   18444952| RS2EZU76IK2BT|B00CO2VH5Y|
|   16937084|R1GJC1BP028XO9|B00LI4RJQ0|
|   23665632|R2VKJPGXXEK5GP|B0046EC1D0|
|    4110125|R17KS83G3KLT97|B00DQQPL36|
|     107621|R3PQL8SR4NEHWL|B003X7RWB2|
|    2415090|R2F5WW7WNO5RRG|B001TJYPJ8|
|   48285966|R3UDJKVWQCFIC9|B000TMHX9A|
|   33228559|R1MLGVJH3J5W6N|B005MZBB2O|
+-----------+--------------+----------+
only showing top 20 rows



### Prelim Apriori ETL Complete - Add Database Export Code

In [17]:
# Configure settings for RDS
mode = "append"
jdbc_url='jdbc:postgresql://<connection_srtring>'
config = {"user":'<username>', 
          "password": '<password>', 
          "driver":"org.postgresql.Driver"}

Note: table name in table='segment_apriori' must align with table name in Postgres

In [18]:
# Write segment_apriori table to RDS
# 8 mins 58 secs
apriori_dropped_df.write.jdbc(url=jdbc_url, table='furniture_apriori', mode=mode, properties=config)

## Amazon Reviews S3 -> Postgres/RDS ETL Complete



### Run Queries in Postgres to Confirm Segment ETL
Check Row Count of Segment Table - SELECT COUNT(*) FROM furniture_segment;

Check 10 Rows of Segment Table - SELECT * FROM furniture_segment LIMIT(10);


In [19]:
# Upon Confirmation of Above Checks Run This Cell
print('Segment ETL Successful')

Segment ETL Successful


### Run Queries in Postgres to Confirm Apriori ETL
Check Row Count of Apriori Table - SELECT COUNT(*) FROM furniture_apriori;

Check 10 Rows of Apriori Table - SELECT * FROM furniture_apriori LIMIT(10);

In [20]:
# Upon Confirmation of Above Checks Run This Cell
print('Apriori ETL Successful')

Apriori ETL Successful
