In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

In [3]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-03-25 16:48:49--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-03-25 16:48:50 (3.66 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETLChallenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [5]:
#Extract
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
reveiw_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"), sep="\t", header=True)
reveiw_data_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...| 2015-08-31|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| Lawn and Garden|          5|    

In [6]:
# Count the number of records (rows) in the dataset.
reveiw_data_df.count()

2557288

In [7]:
reveiw_data_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'string'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [0]:
# Transform
#Drop null values from the table
dropna_df=reveiw_data_df.dropna()

In [9]:
dropna_df.count()

2557005

In [28]:
#Drop duplicate values

df_dedupe = dropna_df.dropDuplicates(['product_id'])
df_reverse = dropna_df.sort((["product_id"]), ascending= False)
df_dedupe.join(df_reverse,['product_id'],'inner')
#Check for duplicates if any: df_dedupe.groupby(['product_id']).count().where('count > 1').sort('count', ascending=False).show()
df_dedupe.count()

282051

In [49]:
#Drop duplicate values
df_dedupe1 = df_dedupe.dropDuplicates(['customer_id'])
df_reverse1 = df_dedupe.sort((["customer_id"]), ascending= False)
df_dedupe1.join(df_reverse1,['customer_id'],'inner')
#Check for duplicates if any: df_dedupe1.groupby(['customer_id']).count().where('count > 1').sort('count', ascending=False).show()
df_dedupe1.count()

239332

In [52]:
#Clean the datatypes according to the SQL schema
cleaned_df = df_dedupe1.withColumn("customer_id",df_dedupe1["customer_id"].cast('int'))
cleaned_df = cleaned_df.withColumn("product_parent",cleaned_df["product_parent"].cast('int'))
cleaned_df = cleaned_df.withColumn("review_date",cleaned_df["review_date"].cast('date'))
cleaned_df = cleaned_df.withColumn("star_rating",cleaned_df["star_rating"].cast('int'))
cleaned_df = cleaned_df.withColumn("helpful_votes",cleaned_df["helpful_votes"].cast('int'))
cleaned_df = cleaned_df.withColumn("total_votes",cleaned_df["total_votes"].cast('int'))
cleaned_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date')]

In [53]:
cleaned_df.count()

239332

In [54]:
#Customer_count in the SQL schema is not present in the input tsv file hence generating a new column with the same name and assiging row number.
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
cleaned_df = cleaned_df.withColumn("customer_count", F.monotonically_increasing_id())
windowSpec = W.orderBy("customer_count")
cleaned_df.withColumn("customer_count", F.row_number().over(windowSpec)).show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+--------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|customer_count|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+--------------+
|         US|   10051712| R9KINVU1IPPSK|B00R4J5MXK|     837522199|Classic Accessori...| Lawn and Garden|          5|            0|          0|   N|                Y|This is the best ...|This is the best ...| 2015-08-28|             1|
|         US|   10071195| RE5A9O6LHLG2G|B004KZIRH4|     5061

In [55]:
cleaned_df.count()

239332

In [56]:
cleaned_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date'),
 ('customer_count', 'bigint')]

In [57]:
cleaned_review_df = cleaned_df.select(["review_id", "customer_id", "product_id","product_parent", "review_date"])
cleaned_review_df.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RLOCQ7LBE7456|   10000472|B00AEBIBCO|     642709840| 2015-08-16|
| R9KINVU1IPPSK|   10051712|B00R4J5MXK|     837522199| 2015-08-28|
| RE5A9O6LHLG2G|   10071195|B004KZIRH4|     506183384| 2015-07-10|
|R1GA2NEWA59GC6|   10083273|B002EP9NPC|     709527911| 2015-07-17|
| RDL4YQ44J3FF5|   10163080|B006VDIDOK|     325989478| 2013-06-20|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [58]:
cleaned_products_df = cleaned_df.select(["product_id", "product_title"])
cleaned_products_df.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00AEBIBCO|Growstone Super S...|
|B00R4J5MXK|Classic Accessori...|
|B004KZIRH4|Woodstock Rainbow...|
|B002EP9NPC|Prime Line 7-0275...|
|B006VDIDOK|Large Granite Col...|
+----------+--------------------+
only showing top 5 rows



In [59]:
cleaned_customers_df = cleaned_df.select(["customer_id", "customer_count"])
cleaned_customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   17413645|             0|
|   47796954|             1|
|   27484443|             2|
|   14168385|             3|
|   34371339|             4|
+-----------+--------------+
only showing top 5 rows



In [60]:
vine_table_df = cleaned_df.select(["review_id", "star_rating", "helpful_votes","total_votes", "vine"])
vine_table_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RLOCQ7LBE7456|          5|            0|          0|   N|
| R9KINVU1IPPSK|          5|            0|          0|   N|
| RE5A9O6LHLG2G|          5|            0|          0|   N|
|R1GA2NEWA59GC6|          5|            0|          0|   N|
| RDL4YQ44J3FF5|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [0]:
# Load
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-1.coxa50m0widb.us-east-2.rds.amazonaws.com:5432/challenge_db"
config = {"user":"postgres",
          "password": "Asdf1234*",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to review_id_table in RDS
cleaned_review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
# Write DataFrame to products table in RDS
cleaned_products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
# Write DataFrame to customers table in RDS
cleaned_customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [0]:
# Write DataFrame to vine_table in RDS
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)