<a href="https://colab.research.google.com/github/susiexia/BigData_Amazon_reviews_ETL_Cloud/blob/master/Amazon_Reriews_ETL_process.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project Objectives
Perform ETL in the cloud AND analyze data using Natural Language Processing (NLP) pipeline including Machine Learning.
part_2 in the file 'Amazon_Reviews_NLP_ML.ipynb'

## **Part_1: ETL**:
- Extracting datasets from AWS S3, 
- Transforming by pyspark in Colab, 
- Loading and writting directly to AWS RDS via jdbc.

In [2]:
# Install Java, Spark, Findspark and download a Postgresql driver
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

--2020-03-14 18:00:01--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-03-14 18:00:02 (4.82 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
# Create a spark session, configured with Posetgres driver
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('AmazonETL')\
        .config("spark.driver.extraClassPath", "/content/postgresql-42.2.9.jar")\
        .getOrCreate()

#### EXTRACT
Connect to data storage, extract d3 data into a spark DataFrame

In [4]:
# Read in data from S3 Bukets
from pyspark import SparkFiles

url= "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Beauty_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get('amazon_reviews_us_Beauty_v1_00.tsv.gz'), sep='\t', header=True, inferSchema = True)
df.show(n=20,truncate=False)


+-----------+-----------+--------------+----------+--------------+----------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                                                 |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline      

#### TRANSFORM

Use pyspark to cast proper data types, drop null or NaN, and seperate into 4 tables to match RDS schema

In [5]:
# original dataset information
print((df.count(), len(df.columns)))
# check data types
df.printSchema

(5115666, 15)


<bound method DataFrame.printSchema of DataFrame[marketplace: string, customer_id: int, review_id: string, product_id: string, product_parent: int, product_title: string, product_category: string, star_rating: string, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: timestamp]>

In [6]:
# change 2 columns data types (star_rating, review_date)
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql import Column
casted_df = df.withColumn('star_rating',df['star_rating'].cast(IntegerType()))
casted_df = df.withColumn('review_date',df['review_date'].cast(DateType()))
casted_df.printSchema

<bound method DataFrame.printSchema of DataFrame[marketplace: string, customer_id: int, review_id: string, product_id: string, product_parent: int, product_title: string, product_category: string, star_rating: string, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: date]>

In [7]:
# drop any rows with null or NaN
droped_df = casted_df.dropna()
droped_df.count()

5114733

In [37]:
# drop duplicate rows
droped_df.dropDuplicates().count()

5114733

In [0]:
# Seperate into 4 dataframes
review_id_df = droped_df.select('review_id', 'customer_id', 'product_id', 'product_parent','review_date')

clean_products_df = droped_df.select('product_id','product_title')

clean_customers_df = droped_df.select('customer_id')

vine_df = droped_df.select('review_id', 'star_rating','helpful_votes','total_votes','vine')


In [17]:
# manipulate products_df uniquely to match RDS structure
products_df = clean_products_df.select('product_id','product_title').distinct()
products_df.count()

588771

In [28]:
# manipulate customer dataframer to get unique customer id and count

customers_df = clean_customers_df.groupBy('customer_id').count()
customers_df = customers_df.withColumnRenamed('count','customer_count')

# descending roder 
customers_df.orderBy(customers_df['customer_count'].desc()).show()



+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   36771761|           871|
|   32405532|           721|
|   10942711|           685|
|   37446839|           587|
|   50199793|           564|
|   12201275|           519|
|    4808156|           508|
|   48233483|           440|
|   52520442|           378|
|   39789300|           346|
|   18609243|           343|
|   37337835|           337|
|   42799904|           314|
|   21012418|           288|
|   10592389|           284|
|   52433525|           250|
|   51126995|           245|
|   52824002|           235|
|   18715781|           230|
|   31120312|           219|
+-----------+--------------+
only showing top 20 rows



In [35]:
# double confirm star_rating data type is int
vine_df =vine_df.withColumn('star_rating',df['star_rating'].cast(IntegerType()))
vine_df.printSchema

<bound method DataFrame.printSchema of DataFrame[review_id: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string]>

#### Load
Connect pyspark to RDS Database by setting up configure.
Write dataframes directly to RDS corresponding tables.

In [29]:
# upload RDS password file
from google.colab import files
files.upload()

Saving RDS_config.py to RDS_config.py


{'RDS_config.py': b"password ='susiexia417'"}

In [0]:
# set up config parameter
from RDS_config import password

mode = "append"
jdbc_url="jdbc:postgresql://dataviz.caktah2xv07p.us-east-2.rds.amazonaws.com:5432/Amazon_reviews"
config = {"user":"postgres",
          "password": password,  
          "driver":"org.postgresql.Driver"}

In [0]:
# write DataFrame into RDS directly via JDBC

customers_df.write.jdbc(url=jdbc_url,
                         table ='customers', 
                         mode=mode, properties = config)


In [0]:
products_df.write.jdbc(url=jdbc_url,
                         table ='products', 
                         mode=mode, properties = config)


In [0]:
review_id_df.write.jdbc(url=jdbc_url,
                         table ='review_id_table', 
                         mode=mode, properties = config)


In [0]:
vine_df.write.jdbc(url=jdbc_url,
                         table ='vine_table', 
                         mode=mode, properties = config)