<a href="https://colab.research.google.com/github/pqrt12/BigData/blob/master/Challenge_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install Java, Spark, Findspark, postgres, etc.

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-05-03 17:28:21--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2020-05-03 17:28:21 (4.76 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ChallengeETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

## Config

In [4]:
# upload "challenge_config.py", may be done using left pane "Upload".
from google.colab import files
temp = files.upload()

Saving challenge_config.py to challenge_config.py


In [0]:
# challenge_config.py
from challenge_config import aws_postgres_host_port, aws_postgre_username, aws_postgre_password, challenge_db
# aws_postgres_host_port = "dataviz.xxx...yyy.amazonaws.com:5432"
# challenge_db = "challenge"
# aws_postgre_username = "postgres"
# aws_postgre_password = "securePassword"

In [0]:
!rm challenge_config.py

## Extract



In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
aws_s3_base_url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/"
# filename = "sample_us.tsv"
filename = "amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(aws_s3_base_url + filename)
raw_data_df = spark.read.csv(SparkFiles.get(filename), sep="\t", header=True, inferSchema=True)

In [8]:
# count the total records
raw_data_df.count()

9002021

In [9]:
# Show DataFrame
raw_data_df.show(3)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...|2015-08-31 00:00:00|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...| 

In [10]:
raw_data_df.describe()

DataFrame[summary: string, marketplace: string, customer_id: string, review_id: string, product_id: string, product_parent: string, product_title: string, product_category: string, star_rating: string, helpful_votes: string, total_votes: string, vine: string, verified_purchase: string, review_headline: string, review_body: string]

In [11]:
raw_data_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



## Transform

###transform on the whole df:
*   change the review_date format
*   keep essential columns only
*   drop duplicate review_id
*   dropna

In [12]:
from pyspark.sql.functions import to_date
columns_keep = ['customer_id',
                'review_id',
                'product_id',
                'product_parent',
                'product_title',
                'star_rating',
                'helpful_votes',
                'total_votes',
                'vine',
                'review_date']
clean_data_df = raw_data_df.withColumn("review_date", to_date("review_date", "yyyy_MM_dd"))   \
                          .select(columns_keep)                                               \
                          .dropDuplicates(['review_id'])                                      \
                          .dropna()
clean_data_df.count()

9001590

In [13]:
clean_data_df.show(3)

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|review_date|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
|   12054485|R1002OCUHG9RB2|B00822KA0S|     678284285|iPhone 4 Premium ...|          4|            0|          0|   N| 2013-11-13|
|   13899950|R1009FXGIMJ5A8|B001OD2OAQ|     177396772|BlackBerry Curve ...|          1|            0|          2|   N| 2011-09-08|
|     513891|R100B5W7AF3U5Y|B00NIEPGFE|     906325961|BLU Zoey II T276 ...|          5|            0|          2|   N| 2015-04-07|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------+
only showing top 3 rows



In [14]:
clean_data_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- review_date: date (nullable = true)



### prepare temporary df for postgre

In [15]:
# temporary dataframes for postgre
review_id_table_df = clean_data_df.select(['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date'])
review_id_table_df.show(3)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1002OCUHG9RB2|   12054485|B00822KA0S|     678284285| 2013-11-13|
|R1009FXGIMJ5A8|   13899950|B001OD2OAQ|     177396772| 2011-09-08|
|R100B5W7AF3U5Y|     513891|B00NIEPGFE|     906325961| 2015-04-07|
+--------------+-----------+----------+--------------+-----------+
only showing top 3 rows



In [16]:
# products_df = clean_data_df.groupBy('product_id', 'product_title').count().select(['product_id', 'product_title'])
products_df = clean_data_df.select(['product_id', 'product_title']).dropDuplicates()
products_df.show(3)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00I3JQA6I|Sennheiser Blueto...|
|B00RLKR91K|iPhone 6S Case Cr...|
|B00BT8L2MW|Tech Armor Apple ...|
+----------+--------------------+
only showing top 3 rows



In [17]:
customers_df = clean_data_df.groupBy('customer_id').count().withColumnRenamed('count', 'customer_count')
customers_df.show(3)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   14341842|            16|
|   12939836|             1|
|   21165990|             4|
+-----------+--------------+
only showing top 3 rows



In [18]:
vine_table_df = clean_data_df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine'])
vine_table_df.show(3)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1002OCUHG9RB2|          4|            0|          0|   N|
|R1009FXGIMJ5A8|          1|            0|          2|   N|
|R100B5W7AF3U5Y|          5|            0|          2|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 3 rows



## Load

In [0]:
# Configure settings for RDS
# mode = "append"
# mode = "Overwrite"
mode = "overwrite"
jdbc_url="jdbc:postgresql://" + aws_postgres_host_port + "/" + challenge_db
config = {"user": aws_postgre_username, 
          "password": aws_postgre_password, 
          "driver": "org.postgresql.Driver"}

In [0]:
review_id_table_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [0]:
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)