In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-05-06 20:04:33--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-05-06 20:04:33 (3.63 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [31]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

pet_products_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv.gz"), sep="\t", header=True)
pet_products_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   28794885| REAKC26P07MDN|B00Q0K9604|     510387886|(8-Pack) EZwhelp ...|    Pet Products|          5|            0|          0|   N|                Y|A great purchase ...|Best belly bands ...| 2015-08-31|
|         US|   11488901|R3NU7OMZ4HQIEG|B00MBW5O9W|     912374672|Warren Eckstein's...|    Pet Products|          2|    

## Count number of records (rows) in dataset

In [32]:
print("Original Count", pet_products_df.count())

Original Count 2643619


## Drop incomplete rows

In [33]:
pet_products_df = pet_products_df.dropna()
print("New Count", pet_products_df.count())

New Count 2643241


## Examine the schema

In [34]:
pet_products_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



## Create dataframes that match table schema file
## Create Review ID Table


In [35]:
# Create new df with required columns
review_id_df = pet_products_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| REAKC26P07MDN|   28794885|B00Q0K9604|     510387886| 2015-08-31|
|R3NU7OMZ4HQIEG|   11488901|B00MBW5O9W|     912374672| 2015-08-31|
|R14QJW3XF8QO1P|   43214993|B0084OHUIO|     902215727| 2015-08-31|
|R2HB7AX0394ZGY|   12835065|B001GS71K2|     568880110| 2015-08-31|
| RGKMPDQGSAHR3|   26334022|B004ABH1LG|     692846826| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [0]:
# Drop duplicates
review_id_df = review_id_df.dropDuplicates(["review_id"])

In [37]:
# Ensure column data types match schema
from pyspark.sql.types import * 

review_id_df = review_id_df.withColumn("customer_id",review_id_df["customer_id"].cast(IntegerType()))\
    .withColumn("product_parent",review_id_df["product_parent"].cast(IntegerType()))\
    .withColumn("review_date",review_id_df["review_date"].cast(DateType()))

review_id_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



## Create Products Table

In [38]:
# Create new df with required columns
products_df = pet_products_df.select(["product_id", "product_title"])
products_df.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00Q0K9604|(8-Pack) EZwhelp ...|
|B00MBW5O9W|Warren Eckstein's...|
|B0084OHUIO|Tyson's True Chew...|
|B001GS71K2|Soft Side Pet Cra...|
|B004ABH1LG|EliteField 3-Door...|
+----------+--------------------+
only showing top 5 rows



In [0]:
# Drop duplicates
products_df = products_df.dropDuplicates(["product_id"])

In [40]:
# Ensure column data types match schema
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



## Create Customers Table

In [41]:
# Create new df with required columns
customers_df = pet_products_df.select(["customer_id"])
customers_df.show(5)

+-----------+
|customer_id|
+-----------+
|   28794885|
|   11488901|
|   43214993|
|   12835065|
|   26334022|
+-----------+
only showing top 5 rows



In [42]:
# Group by customer id to get customer count
# No need to drop duplicates as customer ids will be grouped and dropping duplicates would interfere with count values
customers_df = customers_df.groupBy("customer_id").count()
customers_df.orderBy("customer_id").select(["customer_id", "count"])
customers_df.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   30238476|    1|
|   51090028|    1|
|   14286306|   10|
|    3223564|    1|
|   16794688|    5|
+-----------+-----+
only showing top 5 rows



In [43]:
# Rename count column
customers_df = customers_df.withColumnRenamed("count", "customer_count")
customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   30238476|             1|
|   51090028|             1|
|   14286306|            10|
|    3223564|             1|
|   16794688|             5|
+-----------+--------------+
only showing top 5 rows



In [44]:
# Ensure column data types match schema
customers_df = customers_df.withColumn("customer_id",customers_df["customer_id"].cast(IntegerType()))\
    .withColumn("customer_count",customers_df["customer_count"].cast(IntegerType()))

customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



## Create Vine Table

In [47]:
# Create new df with required columns
vine_df = pet_products_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| REAKC26P07MDN|          5|            0|          0|   N|
|R3NU7OMZ4HQIEG|          2|            0|          1|   N|
|R14QJW3XF8QO1P|          5|            0|          0|   N|
|R2HB7AX0394ZGY|          5|            0|          0|   N|
| RGKMPDQGSAHR3|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [0]:
# Drop duplicates
vine_df = vine_df.dropDuplicates(["review_id"])

In [49]:
# Ensure column data types match schema
vine_df = vine_df.withColumn("star_rating",vine_df["star_rating"].cast(IntegerType()))\
    .withColumn("helpful_votes",vine_df["helpful_votes"].cast(IntegerType()))\
    .withColumn("total_votes",vine_df["total_votes"].cast(IntegerType()))

vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [50]:
# Create two dataframes, one for Vine reviews and one for non-Vine reviews
# Vine review dataframe
vine_reviews_df = vine_df.filter(vine_df["vine"] == "Y")
vine_reviews_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R10AS0O8BTREFQ|          5|            0|          1|   Y|
|R11I573BGRYGZ6|          4|            1|          1|   Y|
|R11OTVBNEK0AR3|          3|            0|          0|   Y|
|R12KX2MY2DIV9N|          4|            0|          0|   Y|
|R17W27ZKRR1VUN|          3|            0|          0|   Y|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [51]:
# Non-vine review dataframe
non_vine_reviews_df = vine_df.filter(vine_df["vine"] == "N")
non_vine_reviews_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R101CJ0JBI9J43|          5|            6|          6|   N|
|R101P7GUI5O1WG|          5|            0|          0|   N|
|R102LL91IX39PK|          5|            0|          0|   N|
|R102YVSWWEAP5K|          5|            0|          0|   N|
|R1042WRU1WDUHY|          5|            7|          8|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [52]:
# What's the total number of Vine vs. non-Vine reviews?
print("Total Number of Vine Reviews", vine_reviews_df.count())
print("Total Number of Non-Vine Reviews", non_vine_reviews_df.count())

Total Number of Vine Reviews 10215
Total Number of Non-Vine Reviews 2633026


In [54]:
# How many Vine reviews rate the products as 5 stars?
star_vine_df = vine_reviews_df.groupBy("star_rating").count()
star_vine_df.orderBy("star_rating").select(["star_rating", "count"]).show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1|  234|
|          2|  633|
|          3| 1703|
|          4| 3302|
|          5| 4343|
+-----------+-----+



In [55]:
# How many Non-Vine reviews rate the products as 5 stars?
star_non_vine_df = non_vine_reviews_df.groupBy("star_rating").count()
star_non_vine_df.orderBy("star_rating").select(["star_rating", "count"]).show()

+-----------+-------+
|star_rating|  count|
+-----------+-------+
|          1| 248586|
|          2| 150645|
|          3| 214915|
|          4| 377940|
|          5|1640940|
+-----------+-------+



In [56]:
# What was the average rating for Vine reviews?
from pyspark.sql.functions import mean
vine_reviews_df.select(mean("star_rating")).show()

+------------------+
|  avg(star_rating)|
+------------------+
|4.0657856093979445|
+------------------+



In [57]:
# What was the average rating for Non-Vine reviews?
non_vine_reviews_df.select(mean("star_rating")).show()

+-----------------+
| avg(star_rating)|
+-----------------+
|4.143932114608819|
+-----------------+



In [58]:
# How many helpful votes did Vine reviews recieve?
helpful_vine_reviews_df = vine_reviews_df.select(["review_id", "helpful_votes"])
total_helpful_vine = helpful_vine_reviews_df.groupby().sum()
total_helpful_vine.show()

+------------------+
|sum(helpful_votes)|
+------------------+
|             20057|
+------------------+



In [59]:
# How many helpful votes did Non-Vine reviews recieve?
helpful_non_vine_reviews_df = non_vine_reviews_df.select(["review_id", "helpful_votes"])
total_helpful_non_vine = helpful_non_vine_reviews_df.groupby().sum()
total_helpful_non_vine.show()

+------------------+
|sum(helpful_votes)|
+------------------+
|           4347837|
+------------------+



In [60]:
# How many total votes did Vine reviews recieve?
votes_vine_reviews_df = vine_reviews_df.select(["review_id", "total_votes"])
total_votes_vine = votes_vine_reviews_df.groupby().sum()
total_votes_vine.show()

+----------------+
|sum(total_votes)|
+----------------+
|           23943|
+----------------+



In [61]:
# How many total votes did Non-Vine reviews recieve?
votes_non_vine_reviews_df = non_vine_reviews_df.select(["review_id", "total_votes"])
total_votes_non_vine = votes_non_vine_reviews_df.groupby().sum()
total_votes_non_vine.show()

+----------------+
|sum(total_votes)|
+----------------+
|         5099623|
+----------------+



## Write DataFrames to RDS

In [0]:
# Link to RDS
mode = "append"
jdbc_url = "jdbc:postgresql://amazon-reviews-db.cistst9iemrt.us-west-1.rds.amazonaws.com:5432/pet_products_db"
config = {
    "user": "root",
    "password": "Removed due to public upload",
    "driver": "org.postgresql.Driver"
}

In [0]:
# Write review ID table
review_id_df.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)

In [0]:
# Write products table
products_df.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)

In [0]:
# Write customers table
customers_df.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)

In [0]:
# Write vine table
vine_df.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)