<a href="https://colab.research.google.com/github/piruzalemi/Big_Data_Challenge/blob/master/alemi_ETL_RDS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [31]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-04-05 20:56:19--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2020-04-05 20:56:19 (8.78 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [33]:
from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
# url = "https://alemi-bucket.s3.us-east-2.amazonaws.com/employee.csv"
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/sample_us.tsv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("sample_us.tsv"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
df.show(100)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!|2015-08-31 00:00:00|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...| 

## Drop duplicates and incomplete rows

In [34]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

49
49
49


## Examine the schema

In [35]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



## Rename columns

In [36]:
df1 = df.withColumnRenamed("product_id", "product_code") 
#        .withColumnRenamed("Email", "email") \
#        .withColumnRenamed("Gender", "gender") \
#        .withColumnRenamed("Hire Date", "hire_date") \
#        .withColumnRenamed("DOB", "dob") \
#        .withColumnRenamed("Encrypted Password", "password")
df1.show(5)

+-----------+-----------+--------------+------------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_code|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+------------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   44331596| R1UE3RPRGCOLD|  B002LHA74O|     818126353|Super Jumbo Playi...|            Toys|          2|            1|          1|   N|                Y|           Two Stars|Cards are not as ...|2015-08-31 00:00:00|
|         US|   18409006|R1HOJ5GOA2JWM0|  B00L71H0F4|     692305292|Barkology Pr

## Create a new DataFrame for employee info

In [44]:
market_info = df1.select(["product_code", "product_title", "star_rating", "vine","review_date"])
market_info.show(5)

+------------+--------------------+-----------+----+-------------------+
|product_code|       product_title|star_rating|vine|        review_date|
+------------+--------------------+-----------+----+-------------------+
|  B002LHA74O|Super Jumbo Playi...|          2|   N|2015-08-31 00:00:00|
|  B00L71H0F4|Barkology Princes...|          2|   N|2015-08-31 00:00:00|
|  B00407S11Y|Fun Express Insec...|          5|   N|2015-08-31 00:00:00|
|  B00XPWXYDK|ZuZo 2.4GHz 4 CH ...|          5|   N|2015-08-31 00:00:00|
|  B000PEOMC8|Intex River Run I...|          5|   N|2015-08-31 00:00:00|
+------------+--------------------+-----------+----+-------------------+
only showing top 5 rows



## Write DataFrame to RDS

In [0]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://alemi-database4.csa2ekppqcl0.us-east-2.rds.amazonaws.com:5432/alemi-db4"
config = {"user":"postgres",
          "password": "******",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to table
df.write.jdbc(url=jdbc_url, table='Alemi_ETL_info', mode=mode, properties=config)

## Create a new DataFrame for Market Star Rating

In [46]:
df3 = df.select(["product_id", "product_title", "star_rating"])
df3.show(5)

+----------+--------------------+-----------+
|product_id|       product_title|star_rating|
+----------+--------------------+-----------+
|B002LHA74O|Super Jumbo Playi...|          2|
|B00L71H0F4|Barkology Princes...|          2|
|B00407S11Y|Fun Express Insec...|          5|
|B00XPWXYDK|ZuZo 2.4GHz 4 CH ...|          5|
|B000PEOMC8|Intex River Run I...|          5|
+----------+--------------------+-----------+
only showing top 5 rows



## Write DataFrame to RDS

In [0]:
# Write DataFrame to table

df3.write.jdbc(url=jdbc_url, table='market_star_rating', mode=mode, properties=config)