In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.1.3'
# spark_version = 'spark-3.<version number>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (185.125.190.39)] [Waiting for headers] [                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
0% [1 InRelease gpgv 242 kB] [Connecting to security.ubuntu.com (185.125.190.39                                                                               Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic I

In [2]:
# Connect to Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar


--2022-08-19 16:55:44--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-08-19 16:55:45 (5.01 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DigitalETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

## ***Extract the data***

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://my-data-class-burketi.s3.amazonaws.com/amazon_reviews_us_Wireless_v1_00.tsv" 
spark.sparkContext.addFile(url)
wireless_reviews_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv"), sep='\t', header=True, inferSchema=True, timestampFormat="yyyy/MM/dd")

wireless_reviews_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...|        Wireless|          4|    

# Transform the data

In [5]:
#number of rows in this dataset
f"The number of rows in this dataset is {wireless_reviews_df.count()}"

'The number of rows in this dataset is 9002021'

In [6]:
f"the number of columns in this dataset are {len(wireless_reviews_df.columns)}"

'the number of columns in this dataset are 15'

In [7]:
f"the number of columns are {wireless_reviews_df.columns}"

"the number of columns are ['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date']"

## Transform the data to fit into the schema

A postgres sql database has been created. the database has 4 tables. In this section 4 dataframes are created. This dataframes will be loaded to the sql database. Therefore the datafrom spark is transformed to ensure that it is compatible with tables in the postgres database

In [8]:
#checking the column datatypes
wireless_reviews_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [9]:
#Changing the column datatypes as some are not true and forming a new dataframe

from pyspark.sql.types import * 
wireless_reviews = wireless_reviews_df.withColumn("customer_id", wireless_reviews_df["customer_id"].cast(IntegerType()))\
                          .withColumn("product_parent", wireless_reviews_df["product_parent"].cast(IntegerType()))\
                          .withColumn("star_rating", wireless_reviews_df["star_rating"].cast(IntegerType()))\
                          .withColumn("helpful_votes", wireless_reviews_df["helpful_votes"].cast(IntegerType()))\
                          .withColumn("total_votes", wireless_reviews_df["total_votes"].cast(IntegerType()))\
                          .withColumn("review_date", wireless_reviews_df["review_date"].cast(DateType()))

In [10]:
#checking the changes
wireless_reviews.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [11]:
#Now creating tables Create review_id_df that will be sent to review_id_table
review_id_df = wireless_reviews.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R3W4P9UBGNGH1U|   16414143|B00YL0EKWE|     852431543| 2015-08-31|
|R15V54KBMTQWAY|   50800750|B00XK95RPQ|     516894650| 2015-08-31|
| RY8I449HNXSVF|   15184378|B00SXRXUKO|     984297154| 2015-08-31|
|R18TLJYCKJFLSR|   10203548|B009V5X1CE|     279912704| 2015-08-31|
|R1NK26SWS53B8Q|     488280|B00D93OVF0|     662791300| 2015-08-31|
|R11LOHEDYJALTN|   13334021|B00XVGJMDQ|     421688488| 2015-08-31|
|R3ALQVQB2P9LA7|   27520697|B00KQW1X1C|     554285554| 2015-08-31|
|R3MWLXLNO21PDQ|   48086021|B00IP1MQNK|     488006702| 2015-08-31|
|R2L15IS24CX0LI|   12738196|B00HVORET8|     389677711| 2015-08-31|
|R1DJ8976WPWVZU|   15867807|B00HX3G6J6|     299654876| 2015-08-31|
|R3MRWNNR8CBTB7|    1972249|B00U4NATNQ|     577878727| 2015-08-31|
|R1DS6DKTUXAQK3|   10956619|B00SZEFDH8|     654620704| 2015-08

In [12]:
#Now creating tables Create review_id_df that will be sent to products_id_table
products_id_df = wireless_reviews.select([ "product_id", "product_title"])
products_id_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00YL0EKWE|LG G4 Case Hard T...|
|B00XK95RPQ|Selfie Stick Fibl...|
|B00SXRXUKO|Tribe AB40 Water ...|
|B009V5X1CE|RAVPower® Element...|
|B00D93OVF0|Fosmon Micro USB ...|
|B00XVGJMDQ|iPhone 6 Case, Vo...|
|B00KQW1X1C|Nokia Lumia 630 R...|
|B00IP1MQNK|Lumsing 10400mah ...|
|B00HVORET8|iPhone 5S Battery...|
|B00HX3G6J6|HTC One M8 Screen...|
|B00U4NATNQ|S6 Case - Bear Mo...|
|B00SZEFDH8|BLU Studio X, Unl...|
|B00JRJUL9U|EZOPower 5-Port U...|
|B00KQ4T0HE|iPhone 6S Case &i...|
|B00M0YWKPM| iPhone 6s Plus Case|
|B00KDZEE68|zBoost ZB575-A TR...|
|B00BJN45GM|OtterBox Defender...|
|B00SA86SXW|Aduro PowerUP 30W...|
|B00Q3I68TU|LilGadgets Connec...|
|B00TN4J1TA|Anker Aluminum Mu...|
+----------+--------------------+
only showing top 20 rows



In [13]:
#dropping duplicates from product_id_df
products_id_df = products_id_df.dropDuplicates(["product_id"])

In [14]:
products_id_df.count()

906592

In [15]:
#Are there any duplicates
if products_id_df.count() > products_id_df.dropDuplicates(["product_id"]).count():
    raise ValueError('Data has duplicates')

In [16]:
products_id_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [17]:
# Create customers table 
customers_df = wireless_reviews.groupby("customer_id")\
              .agg({"customer_id": "count"})\
              .withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   46909180|             6|
|   42560427|             7|
|   43789873|             3|
|   22037526|             2|
|   34220092|             2|
|   42801586|             1|
|    9565734|             2|
|   15829398|             1|
|   38247118|             1|
|   32478248|             2|
|   48114630|             1|
|   23085063|             1|
|   32787070|             3|
|   43515569|             1|
|    4919528|             2|
|    5088547|             2|
|   41852407|             3|
|   49703087|             1|
|   12713799|             1|
|   36728141|             8|
+-----------+--------------+
only showing top 20 rows



In [18]:
# Check the customers table schema 
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



In [19]:
# Change customer_count to integer type
customers_df =customers_df.withColumn("customer_count", customers_df["customer_count"].cast(IntegerType()))
customers_df.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [20]:
vine_df = wireless_reviews.select(["review_id","star_rating","helpful_votes", "total_votes","vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3W4P9UBGNGH1U|          2|            1|          3|   N|
|R15V54KBMTQWAY|          4|            0|          0|   N|
| RY8I449HNXSVF|          5|            0|          0|   N|
|R18TLJYCKJFLSR|          5|            0|          0|   N|
|R1NK26SWS53B8Q|          5|            0|          0|   N|
|R11LOHEDYJALTN|          5|            0|          0|   N|
|R3ALQVQB2P9LA7|          4|            0|          0|   N|
|R3MWLXLNO21PDQ|          5|            0|          0|   N|
|R2L15IS24CX0LI|          5|            0|          0|   N|
|R1DJ8976WPWVZU|          3|            0|          0|   N|
|R3MRWNNR8CBTB7|          5|            0|          0|   N|
|R1DS6DKTUXAQK3|          5|            0|          0|   N|
| RWJM5E0TWUJD2|          5|            0|          0|   N|
|R1XTJKDYNCRGAC|          1|            

In [21]:
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



# Load data to Posgress Database

In [22]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [23]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-bootcamp.cgidpfd2p1nl.us-east-1.rds.amazonaws.com:5432/amazon_wireless_reviews_database"
config = {"user":"root", 
          "password": "bootcamp", 
          "driver":"org.postgresql.Driver"}


In [24]:
# Write DataFrame to customers table in RDS

customers_df.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)


In [25]:
# Write DataFrame to review_id_table table in RDS

review_id_df.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)


In [26]:
# Write DataFrame to vine_table table in RDS

vine_df.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)

In [27]:
# Write DataFrame to products table in RDS

products_id_df.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)
