In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark

findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 5,485 B/88.7                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 34.4 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 75.0 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to ppa.launch0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/cr

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-12-03 22:13:47--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-12-03 22:13:48 (4.85 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Pet_Product_Reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://aws-reviews-dwc.s3.amazonaws.com/amazon_reviews_us_Pet_Products_v1_00.tsv"
spark.sparkContext.addFile(url)
outdoor_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
outdoor_data_df.show(5)
outdoor_data_df.count()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   28794885| REAKC26P07MDN|B00Q0K9604|     510387886|(8-Pack) EZwhelp ...|    Pet Products|          5|            0|          0|   N|                Y|A great purchase ...|Best belly bands ...| 2015-08-31|
|         US|   11488901|R3NU7OMZ4HQIEG|B00MBW5O9W|     912374672|Warren Eckstein's...|    Pet Products|          2|    

2643619

In [5]:
# Drop null values
dropna_df = outdoor_data_df.dropna()
dropna_df.show(5)
dropna_df.count()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   28794885| REAKC26P07MDN|B00Q0K9604|     510387886|(8-Pack) EZwhelp ...|    Pet Products|          5|            0|          0|   N|                Y|A great purchase ...|Best belly bands ...| 2015-08-31|
|         US|   11488901|R3NU7OMZ4HQIEG|B00MBW5O9W|     912374672|Warren Eckstein's...|    Pet Products|          2|    

2643241

In [6]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with active users
cleaned_df = dropna_df.select(['customer_id','review_id','product_id','product_parent','product_title','star_rating','helpful_votes','total_votes','vine','review_date'])
cleaned_df.show(5)

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|review_date|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|   28794885| REAKC26P07MDN|B00Q0K9604|     510387886|(8-Pack) EZwhelp ...|          5|            0|          0|   N| 2015-08-31|
|   11488901|R3NU7OMZ4HQIEG|B00MBW5O9W|     912374672|Warren Eckstein's...|          2|            0|          1|   N| 2015-08-31|
|   43214993|R14QJW3XF8QO1P|B0084OHUIO|     902215727|Tyson's True Chew...|          5|            0|          0|   N| 2015-08-31|
|   12835065|R2HB7AX0394ZGY|B001GS71K2|     568880110|Soft Side Pet Cra...|          5|            0|          0|   N| 2015-08-31|
|   26334022| RGKMPDQGSAHR3|B004ABH1LG|     692846826|EliteField 3-Door...|        

In [7]:
# count of each customer

count_df = cleaned_df.groupBy('customer_id').count()
count_df.show(5)
count_df.count()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   10270641|    1|
|   18365872|    1|
|   16711087|    1|
|   10742726|    2|
|   41169638|    1|
+-----------+-----+
only showing top 5 rows



1414974

In [8]:
# join count_df with cleaned_df

joined_df= count_df.join(cleaned_df, on='customer_id', how='inner')
joined_df.show(5)
joined_df.count()

+-----------+-----+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|customer_id|count|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|review_date|
+-----------+-----+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|      10206|    1|R26I75PJ1BMDBI|B00D5AO9JU|     359991213|PetArmorPro Advan...|          1|            0|          0|   N| 2014-10-01|
|      10227|    2|R1R3Y200ZA6PNC|B001INRIQW|     681369574|Kurgo Skybox Dog ...|          5|            0|          0|   N| 2013-07-09|
|      10227|    2|R3Q4ASMWTI8FAU|B007CREIES|     974180331|Flexi Freedom Sof...|          5|            0|          0|   N| 2013-07-09|
|      10228|    1| RQ2CNFO0EOHNK|B003SZR49E|     712138485|Pet Gear Roadster...|          5|            0|          0|   N| 2015-04-21|
|      10344|    1| RR0T2H3GMPVOK|B00UZKB

2643241

In [9]:
from pyspark.sql.functions import *

dt_df = joined_df.select('review_id','review_date',to_date('review_date').alias('review_date_cv'))
dt_df = dt_df.drop('review_date')
dt_df = dt_df.withColumnRenamed('review_date_cv','review_date')
dt_df.show(5)
dt_df.count()
dt_df.dtypes

+--------------+-----------+
|     review_id|review_date|
+--------------+-----------+
|R19ZSWI1PCRD5O| 2015-08-30|
|R2IOR1M9JRFXON| 2015-08-30|
|R26I75PJ1BMDBI| 2014-10-01|
|R1R3Y200ZA6PNC| 2013-07-09|
|R3Q4ASMWTI8FAU| 2013-07-09|
+--------------+-----------+
only showing top 5 rows



[('review_id', 'string'), ('review_date', 'date')]

In [10]:
dt_df.count()

2643241

In [11]:
# drop string date column

joined_df = joined_df.drop('review_date')
joined_df.show(1)

+-----------+-----+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+
|customer_id|count|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|
+-----------+-----+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+
|      10206|    1|R26I75PJ1BMDBI|B00D5AO9JU|     359991213|PetArmorPro Advan...|          1|            0|          0|   N|
+-----------+-----+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+
only showing top 1 row



In [12]:
# df with converted time

joined_dt_df = dt_df.join(joined_df, on='review_id', how='inner')
joined_dt_df.show(5)
joined_dt_df.count()

+--------------+-----------+-----------+-----+----------+--------------+--------------------+-----------+-------------+-----------+----+
|     review_id|review_date|customer_id|count|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-----------+-----+----------+--------------+--------------------+-----------+-------------+-----------+----+
|R10007MH6NTVFM| 2015-06-24|   25105396|    2|B0090Z9FFC|     675354291|KONG Pudge Braidz...|          3|            0|          0|   N|
|R1000CIZTRNP23| 2015-03-06|   25423435|    9|B00K1B6RCI|     308737701|PetZu Mother's Co...|          4|            3|          3|   N|
|R1000JOVLD0J41| 2010-11-13|   34245087|    3|B000TZ7022|     870738517|Aspen Pet Lebistr...|          2|            0|          0|   N|
|R10015VI9KRLUW| 2014-03-27|   18810197|    1|B005IZOB5M|     259423137|EcoSphere Closed ...|          5|            0|          0|   N|
|R1001RSHOZYRB6| 2014-08-14|   25116815| 

2643241

In [13]:
# drop duplicates

joined_dt_df = joined_dt_df.dropDuplicates(['review_id'])
joined_dt_df.show(5)
joined_dt_df.count()

+--------------+-----------+-----------+-----+----------+--------------+--------------------+-----------+-------------+-----------+----+
|     review_id|review_date|customer_id|count|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-----------+-----+----------+--------------+--------------------+-----------+-------------+-----------+----+
|R10007MH6NTVFM| 2015-06-24|   25105396|    2|B0090Z9FFC|     675354291|KONG Pudge Braidz...|          3|            0|          0|   N|
|R1000CIZTRNP23| 2015-03-06|   25423435|    9|B00K1B6RCI|     308737701|PetZu Mother's Co...|          4|            3|          3|   N|
|R1000JOVLD0J41| 2010-11-13|   34245087|    3|B000TZ7022|     870738517|Aspen Pet Lebistr...|          2|            0|          0|   N|
|R10015VI9KRLUW| 2014-03-27|   18810197|    1|B005IZOB5M|     259423137|EcoSphere Closed ...|          5|            0|          0|   N|
|R1001RSHOZYRB6| 2014-08-14|   25116815| 

2643241

In [15]:
# review id table

review_id_df = joined_dt_df.select(['review_id','customer_id','product_id','product_parent','review_date'])
review_id_df.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R10007MH6NTVFM|   25105396|B0090Z9FFC|     675354291| 2015-06-24|
|R1000CIZTRNP23|   25423435|B00K1B6RCI|     308737701| 2015-03-06|
|R1000JOVLD0J41|   34245087|B000TZ7022|     870738517| 2010-11-13|
|R1000YCFR347I2|   48610961|B00CO527IC|     935823254| 2014-09-05|
|R1000ZYX4P32OD|   27895609|B002DXCQMM|     184150150| 2015-02-27|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [16]:
# products table

products_df = joined_dt_df.dropDuplicates(['product_id'])
products_df = products_df.select(['product_id','product_title'])
products_df.show(5)
products_df.count()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0310824230|Advantage Flea Co...|
|0615553605|Pet Qwerks Treat ...|
|0684836483|250 Things You Ca...|
|0763004227|Golden Retriever ...|
|0764102885|Saint Bernards (B...|
+----------+--------------------+
only showing top 5 rows



239307

In [17]:
# customers table

cust_df = joined_dt_df.withColumnRenamed('count','customer_count')
cust_df = cust_df.dropDuplicates(['customer_id'])
cust_df.count()
customers_df = cust_df.select(['customer_id','customer_count'])
customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   38226999|            24|
|   28607592|             1|
|   46135831|             9|
|   13893272|             3|
|   48646654|             8|
+-----------+--------------+
only showing top 5 rows



In [18]:
# vine table

vine_df = joined_dt_df.filter('vine == "Y"')
vine_df = vine_df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R100I2RPEA7HUI|          5|            0|          0|   Y|
|R101ZZC5C3LWS3|          5|            1|          1|   Y|
|R102X2QBHZ54FH|          5|            0|          0|   Y|
|R104SGAEPUVJK6|          5|            0|          1|   Y|
|R1058T0A96VGIW|          4|            0|          0|   Y|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



Postgres Setup

In [30]:
# Configure settings for RDS
import config # upload config.py with db function containing db_pass variable to colab

mode = "append"
jdbc_url= config.ep_p()
config = {"user":"root", 
          "password": config.db(), 
          "driver":"org.postgresql.Driver"}


In [20]:
# Write DataFrame to review_id_table in RDS

review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [21]:
# Write DataFrame to products table in RDS

products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [22]:
# Write DataFrame to customers table in RDS

customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [23]:
# Write DataFrame to vine_table in RDS

vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)