In [None]:
import os
# Find the latest version of spark 3.2.x  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.2.2'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Install the PostgreSQL driver in our Colab environment
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
# Establish a Spark session and add the Postgres driver to the filepath
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read the first CSV file from an S3 bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1/22-big-data/day_3/user_data.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show()

In [None]:
# Read the next CSV file from an S3 bucket
url = 'https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1/22-big-data/day_3/user_payment.csv'
spark.sparkContext.addFile(url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)

# Rename the id column to billing_id
user_payment_df = user_payment_df.withColumnRenamed("id", "billing_id")

# Show DataFrame
user_payment_df.show()

In [None]:
# Join the two DataFrames 
joined_df= user_data_df.join(user_payment_df, on="username", how="inner")
joined_df.show()

In [None]:
# Drop null values
dropna_df = joined_df.dropna(how='any')
dropna_df.show()

In [None]:
# Load a function that allows us to select columns
from pyspark.sql.functions import col

# Filter for only columns with active users
cleaned_df = dropna_df.filter(col("active_user") == True)
cleaned_df.show()

In [None]:
# Create a dataframe for the active_user table in our database
clean_user_df = cleaned_df.select(["billing_id", "first_name", "last_name", "username"])
clean_user_df.show()

In [None]:
# Create a billing dataframe for the billing_info table in our database
clean_billing_df = cleaned_df.select(["billing_id", "street_address", "state", "username"])
clean_billing_df.show()

In [None]:
# Create a payment dataframe for the payment_info table in our database
clean_payment_df = cleaned_df.select(["billing_id", "cc_encrypted"])
clean_payment_df.show()

Postgres Setup

In [None]:
# IMPORTANT: Replace each of these parameters with your own values for your AWS RDS instance
my_aws_endpoint = 'mypostgresdb.czwtrbwfkobv.us-east-2.rds.amazonaws.com' # This is my value; please replace with your own
my_aws_port_number = '5432' # Your value is likely the same, but please double check
my_aws_database_name = 'my_database' # This is my value; please replace with your own
my_aws_username = 'postgres' # Your value is likely the same, but please double check
my_aws_password = '<password>' # This is my value; please replace with your own


In [None]:
# Define the connection string
jdbc_url=f'jdbc:postgresql://{my_aws_endpoint}:{my_aws_port_number}/{my_aws_database_name}'

# Set up the configuration parameters
config = {"user": f'{my_aws_username}', 
          "password": f'{my_aws_password}', 
          "driver":"org.postgresql.Driver"}

# Choose to overwrite the existing data. Note that 'append' is probably a smarter choice
# in those situations where the PostgreSQL schema automatically generates the primary key. 
#  But 'overwrite' works best for this little demo. 
mode = 'overwrite' 


In [None]:
# Write the dataframe to the appropriate table in your PostgreSQL RDS

clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
# Write the dataframe to the appropriate table in your PostgreSQL RDS

clean_billing_df.write.jdbc(url=jdbc_url, table='billing_info', mode=mode, properties=config)

In [None]:
# Write the dataframe to the appropriate table in your PostgreSQL RDS

clean_payment_df.write.jdbc(url=jdbc_url, table='payment_info', mode=mode, properties=config)