<a href="https://colab.research.google.com/github/susiexia/BigData_ETL-on-Amazon-dataset/blob/master/pyspark_RDB_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **ETL** process of extracting datasets from AWS S3, transforming by pyspark in Colab, loading and writting directly to AWS RDS via jdbc

Prep steps: upload dataset to S3, and set up a pre-established db in RDS using pgAdmin

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

For wirting spark directly to Postgres DB, download **a Postgres Driver** that will allow Spark to interact with Postgres:

In [4]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-03-14 00:11:50--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-03-14 00:11:50 (3.60 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



Create a spark session, configured with downloaded Posetgres driver in content folder

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigDataETL')\
        .config("spark.driver.extraClassPath", "/content/postgresql-42.2.9.jar")\
        .getOrCreate()

## EXTRACT

Connect to data storage, extract d3 data into a spark DataFrame

In [6]:
# Read in data from S3 Bukets
from pyspark import SparkFiles

url_1= "https://susiexiadatavizexample.s3.us-east-2.amazonaws.com/user_data.csv"
spark.sparkContext.addFile(url_1)
user_df = spark.read.csv(SparkFiles.get('user_data.csv'), sep=',', header=True, inferSchema = True)
user_df.show(n=5,truncate=False)

url_2 = 'https://susiexiadatavizexample.s3.us-east-2.amazonaws.com/user_payment.csv'
spark.sparkContext.addFile(url_2)
payment_df=spark.read.csv(SparkFiles.get('user_payment.csv'), sep=',',header=True)
payment_df.show(5)

+---+----------+---------+-----------+-------------------+--------------+---------+
|id |first_name|last_name|active_user|street_address     |state         |username |
+---+----------+---------+-----------+-------------------+--------------+---------+
|1  |Cletus    |Lithcow  |false      |78309 Riverside Way|Virginia      |ibearham0|
|2  |Caz       |Felgat   |false      |83 Hazelcrest Place|Alabama       |wwaller1 |
|3  |Kerri     |Crowson  |false      |112 Eliot Pass     |North Carolina|ichesnut2|
|4  |Freddie   |Caghy    |false      |15 Merchant Way    |New York      |tsnarr3  |
|5  |Sadella   |Deuss    |false      |079 Acker Avenue   |Tennessee     |fwherrit4|
+---+----------+---------+-----------+-------------------+--------------+---------+
only showing top 5 rows

+----------+---------+--------------------+
|billing_id| username|        cc_encrypted|
+----------+---------+--------------------+
|         1|ibearham0|a799fcafe47d7fb19...|
|         2| wwaller1|a799fcafe47d7fb19...|

## TRANSFORM

Use pyspark to JOIN, Clean, change data type, Filter, seperate into 3 tables

In [7]:
# JOIN two df by pyspark
joined_df = user_df.join(payment_df, on='username', how='inner')
joined_df.show(n=5, truncate = False)

+------------+---+----------+---------+-----------+----------------------+--------------------+----------+----------------------------------------+
|username    |id |first_name|last_name|active_user|street_address        |state               |billing_id|cc_encrypted                            |
+------------+---+----------+---------+-----------+----------------------+--------------------+----------+----------------------------------------+
|ibearham0   |1  |Cletus    |Lithcow  |false      |78309 Riverside Way   |Virginia            |1         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|wwaller1    |2  |Caz       |Felgat   |false      |83 Hazelcrest Place   |Alabama             |2         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|ichesnut2   |3  |Kerri     |Crowson  |false      |112 Eliot Pass        |North Carolina      |3         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|tsnarr3     |4  |Freddie   |Caghy    |false      |15 Merchant Way       |New York            |4         |a799fc

In [8]:
# dropna() to drop any rows with null or NaN
cleaned_df = joined_df.dropna()
cleaned_df.show(n=10, truncate = False)

+-----------+---+----------+---------+-----------+--------------------+--------------------+----------+----------------------------------------+
|username   |id |first_name|last_name|active_user|street_address      |state               |billing_id|cc_encrypted                            |
+-----------+---+----------+---------+-----------+--------------------+--------------------+----------+----------------------------------------+
|ibearham0  |1  |Cletus    |Lithcow  |false      |78309 Riverside Way |Virginia            |1         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|wwaller1   |2  |Caz       |Felgat   |false      |83 Hazelcrest Place |Alabama             |2         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|ichesnut2  |3  |Kerri     |Crowson  |false      |112 Eliot Pass      |North Carolina      |3         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|tsnarr3    |4  |Freddie   |Caghy    |false      |15 Merchant Way     |New York            |4         |a799fcafe47d7fb19bfb02cd838

In [62]:
cleaned_df.printSchema

<bound method DataFrame.printSchema of DataFrame[username: string, id: int, first_name: string, last_name: string, active_user: boolean, street_address: string, state: string, billing_id: string, cc_encrypted: string]>

In [72]:
# change data types in billing_id
from pyspark.sql import Column
from pyspark.sql.types import IntegerType

cleaned_df = cleaned_df.withColumn('billing_id', cleaned_df['billing_id'].cast(IntegerType()))
cleaned_df.printSchema

<bound method DataFrame.printSchema of DataFrame[username: string, id: int, first_name: string, last_name: string, active_user: boolean, street_address: string, state: string, billing_id: int, cc_encrypted: string]>

In [73]:
# filter df
filtered_df = cleaned_df.filter(cleaned_df.active_user == True)
filtered_df.show(n=5, truncate = False)

+-------------+---+----------+-----------+-----------+------------------------+--------------------+----------+----------------------------------------+
|username     |id |first_name|last_name  |active_user|street_address          |state               |billing_id|cc_encrypted                            |
+-------------+---+----------+-----------+-----------+------------------------+--------------------+----------+----------------------------------------+
|fstappard5   |6  |Fraser    |Korneev    |true       |76084 Novick Court      |Minnesota           |6         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|lhambling6   |7  |Demott    |Rapson     |true       |86320 Dahle Park        |District of Columbia|7         |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|wheinerte    |15 |Sadella   |Jaram      |true       |7528 Waxwing Terrace    |Connecticut         |15        |a799fcafe47d7fb19bfb02cd83855fdfc34b9f87|
|droughsedgeg |17 |Hewitt    |Trammel    |true       |2455 Corry Alley        |Nor

In [78]:
# seperate into 3 dfs to match db tables, use select() function
clean_user_df = filtered_df.select('id','first_name','last_name','username')
#clean_user_df.show()

clean_bill_df = filtered_df.select('billing_id','street_address','state','username')
#clean_bill_df.show()

clean_payment_df = filtered_df.select('billing_id','cc_encrypted')
#clean_payment_df.show()

clean_bill_df.printSchema

<bound method DataFrame.printSchema of DataFrame[billing_id: int, street_address: string, state: string, username: string]>

## LOAD 

Connect pyspark to RDS Database by setting up configure.
Write dataframes to RDS table.

In [56]:
from google.colab import files
uploaded = files.upload()

Saving config.py to config.py


In [0]:
from config import password

In [0]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://dataviz.caktah2xv07p.us-east-2.rds.amazonaws.com:5432/cloud_ETL_practice"
config = {"user":"postgres",
          "password": password,   #RDS server password
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DF to table in RDS

clean_user_df.write.jdbc(url=jdbc_url,
                         table ='active_user', 
                         mode=mode, properties = config)



In [0]:
clean_bill_df.write.jdbc(url=jdbc_url, table = 'billing_info',
                         mode=mode,properties = config)


In [0]:

clean_payment_df.write.jdbc(url=jdbc_url, mode=mode,
                            table = 'payment_info', properties=config)