# **Amazon Reviews: Baby ETL**

## Setup

In [14]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


In [15]:
# Connect to Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-06-02 01:01:40--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-06-02 01:01:40 (10.4 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



## Extract

In [3]:
# Extract dataset from S3
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BabyCloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Load dataset into a DataFrame
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Baby_v1_00.tsv.gz"

spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Baby_v1_00.tsv.gz"), sep="\t")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    9970739| R8EWA1OFT84NX|B00GSP5D94|     329991347|Summer Infant Swa...|            Baby|          5|            0|          0|   N|                Y|Great swaddled bl...|Loved these swadd...| 2015-08-31|
|         US|   23538442|R2JWY4YRQD4FOP|B00YYDDZGU|     646108902|Pacifier Clip Gir...|            Baby|          5|    

## Transform

In [5]:
# Drop duplicate and/or incomplete rows
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

1752932
1752727
1752727


In [6]:
# Examine the schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [7]:
# Create a new DataFrame for Review IDs
review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
print(review_id_df.count())
review_id_df = review_id_df.dropna()
print(review_id_df.count())
review_id_df = review_id_df.dropDuplicates()
print(review_id_df.count())
review_id_df.show(10)

1752727
1752727
1752727
+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R13Q4MMJH7GQ9I|   22498025|B003553WJ8|     644988403| 2011-06-08|
|R1A541O3CUQKN3|   51743963|B0073WXL4A|     315550434| 2014-03-12|
|R1E8I8MMF6REMA|   18664275|B0015V3NPM|     560204418| 2013-11-04|
|R1FO136CP9CUMP|   41958721|B002TOKHJE|     777225859| 2014-03-25|
|R1LDIKE44UJIVZ|   23553323|B002YQQQJW|     235815365| 2011-09-09|
|R1M0DNTRUO05E9|    6072387|B00I0M8SAY|     856944782| 2015-01-10|
|R1M6CRJBTITM7M|   34016610|B00DNBA82S|     631647699| 2014-09-26|
|R1PZM9YVKV3BPF|   38600252|B008OIZ0D2|     667344221| 2014-03-07|
|R1UHQOFV5Q1IB3|   16148758|B0044R7I1O|     767828174| 2012-04-04|
|R1ZXTSPW2HJA9Y|   13819361|B004AHMCKA|     354866278| 2012-02-01|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [8]:
# Adjust data types to match DB
from pyspark.sql.types import * 
review_id_df = review_id_df.withColumn("customer_id",review_id_df["customer_id"].cast(IntegerType()))\
    .withColumn("product_parent",review_id_df["product_parent"].cast(IntegerType()))\
    .withColumn("review_date",review_id_df["review_date"].cast(DateType()))
review_id_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [9]:
# Create a new DataFrame for Products
products_df = df.select(["product_id", "product_title"])
print(products_df.count())
products_df = products_df.dropna()
print(products_df.count())
products_df = products_df.dropDuplicates()
print(products_df.count())
products_df.show(10)

1752727
1752727
160168
+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B000GAYYCE|Munchkin Diaper C...|
|B00005BTNA|Playtex 3 Pack BP...|
|B001M4HPVK|Luna Lullaby Boso...|
|B0000696D6|Lambs Ivy Jake Di...|
|B00BXE5NWW|NUK Gerber Gradua...|
|B001TWI81G|Thermos Reusable ...|
|B00R1FZ2SC|BUBM Universal Ca...|
|B0069JBKHI|Withings Smart Ba...|
|B0044FGIY4|Itzy Ritzy Infant...|
|B002UXQRM0|NUK Active Silico...|
+----------+--------------------+
only showing top 10 rows



In [10]:
# Create a new DataFrame for Customers
customers_df = df.select("customer_id")
customers_df = customers_df.groupby("customer_id").count()
customers_df = customers_df.withColumnRenamed("count", "customer_count")
print(customers_df.count())
customers_df.show(10)

961990
+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   39242515|             1|
|   26484421|             6|
|   39733825|             1|
|   49860153|             1|
|   12610137|             8|
|   13936007|             1|
|   18255784|             5|
|   21104872|             1|
|   38203060|             5|
|   43996862|             3|
+-----------+--------------+
only showing top 10 rows



In [11]:
# Adjust data types to match DB
customers_df = customers_df.withColumn("customer_id",customers_df["customer_id"].cast(IntegerType()))\
    .withColumn("customer_count",customers_df["customer_count"].cast(IntegerType()))
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [12]:
# Create a new DataFrame for Vines
vines_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine" ])
print(vines_df.count())
vines_df = vines_df.dropna()
print(vines_df.count())
vines_df = vines_df.dropDuplicates()
print(vines_df.count())
vines_df.show(10)

1752727
1752727
1752727
+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R14CHUYC6L76TS|          5|            0|          0|   N|
|R16ZA9U26WDFQ1|          5|            1|          1|   N|
|R18G4XRUTSKDUM|          1|            0|          0|   N|
|R18TZRL0YMJ0FX|          4|            0|          0|   N|
|R1BKBTRGUU6XRY|          5|            0|          0|   N|
|R1BM4E7D2F20TC|          5|            6|          6|   N|
|R1D76HRB79TZ99|          5|            0|          0|   N|
|R1HOUU6VJN1L92|          3|            0|          1|   N|
|R1HZ9ZRMHZI57Z|          5|            0|          0|   N|
|R1M5E9092Q14K1|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 10 rows



In [13]:
# Adjust data types to match DB
vines_df = vines_df.withColumn("star_rating",vines_df["star_rating"].cast(IntegerType()))\
    .withColumn("helpful_votes",vines_df["helpful_votes"].cast(IntegerType()))\
    .withColumn("total_votes",vines_df["total_votes"].cast(IntegerType()))
vines_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



## Load

In [16]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://nicolefirstdatabase.cmrhlgsp7vsd.us-east-2.rds.amazonaws.com:5432/awsdatabase"
config = {"user":"postgres",
          "password": "",
          "driver":"org.postgresql.Driver"}


In [17]:
# Write DataFrame to RDS
review_id_df.write.jdbc(url = jdbc_url, table = "review_id_table", mode = mode, properties = config)

In [18]:
products_df.write.jdbc(url = jdbc_url, table = "products", mode = mode, properties = config)

In [19]:
customers_df.write.jdbc(url = jdbc_url, table = "customers", mode = mode, properties = config)

In [20]:
vines_df.write.jdbc(url = jdbc_url, table = "vine_table", mode = mode, properties = config)