# PySpark ETL

In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download Postgres driver
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
# Import dependencies
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,OneHotEncoder
import sklearn as skl
import tensorflow as tf
import os
from tensorflow.keras.callbacks import ModelCheckpoint
from pyspark import SparkFiles

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

## Import Dataset

In [None]:
# Import and read our input CSV dataset
df = pd.read_csv('<file_name.csv>')
df.head()

# OR Read in data from S3 Buckets into a DataFrame
from pyspark import SparkFiles
url ="https://YOUR-BUCKET-NAME.s3.amazonaws.com/<file_name.csv">
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("<file_name.csv), sep=",", header=True, inferSchema=True)

# This is our db url = "database-1.czpjmlarn3xk.us-east-2.rds.amazonaws.com"
# Be sure to change the S3 bucket name to your bucket name

## Extract
Connect to database and then extract data into a DataFrame

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://YOUR-BUCKET-NAME.s3.amazonaws.com/<file_name.csv>"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("<file_name.csv>"), sep=",", header=True, inferSchema=True)


In [None]:
# Show DataFrame
df.show()

## Transform
Transformm raw data stored in S3 in PySpark DataFrame

In [None]:
# Drop any roles with null or "not a number" (NaN) values
dropna_df = joined_df.dropna()
dropna_df.show()

In [None]:
# Drop the non-beneficial columns
df = df.drop(columns=["", "N"], axis=1)
df.head()

In [None]:
# Determine the number of unique values in each column
cnt = df.nunique(axis=0)

In [None]:
# Check for unique values is to use the Pandas DataFrame's value_counts method
application_counts = df.column_name.value_counts()
application_counts 

In [None]:
# Visualize the value counts of APPLICATION_TYPE
application_counts.plot.density()

In [None]:
# Determine which values to replace if counts are less than ...?
replace_application = list(application_counts[application_counts < 500].index)

# Replace in dataframe
for app in replace_application:
    application_df.column_name = application_df.column_name.replace(app,"Other")
    
# Check to make sure binning was successful
application_df.column_name.value_counts()

# This reduces the number of unique values

In [None]:
# Generate our categorical variable lists
application_cat = df.dtypes[df.dtypes == "object"].index.tolist()
application_cat

# Generate categorical list prior to encoding all categorical data

In [None]:
# Create a OneHotEncoder instance
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(sparse=False)

# Fit and transform the OneHotEncoder using the categorical variable list
encode_df = pd.DataFrame(enc.fit_transform(df[application_cat]))

# Add the encoded variable names to the dataframe (rename encoded column)
encode_df.columns = enc.get_feature_names(application_cat)
encode_df.head()

In [None]:
# Merge one-hot encoded features and drop the originals
merged_df = df.merge(encode_df,left_index=True,right_index=True).drop(application_cat,1)
merged_df.head()

## Load transformed raw data into our database

In [None]:
# Store environmental variable
from getpass import getpass
password = getpass('Enter database password')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<connection string>:5432/<database-name>"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

## Write the cleaned DataFrame directly into database

In [None]:
# Write DataFrame to active_user table in RDS
merged_df.write.jdbc(url=jdbc_url, table='<table_name>', mode=mode, properties=config)

# If we are loading into multiple tables, create separate DataFrames to matcch the table


## Validate data successfully written to database by running queries in pgAdmin on the database