In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

In [0]:
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com//<bucket name>/user_data.csv"
spark.sparkContext.addFile(url)

user_data_df = spark.read.option('header', 'true').csv(SparkFiles.get("user_data.csv"), inferSchema=True, sep=',')
user_data_df.show(10)

In [0]:
# Load in user_payment.csv from S3 into a DataFrame

url = "https://s3.amazonaws.com//<bucket name>/user_payment.csv"
spark.sparkContext.addFile(url)

user_payment_df = spark.read.option('header', 'true').csv(SparkFiles.get("user_payment.csv"), inferSchema=True, sep=',')
user_payment_df.show(10)

In [0]:
# Join the two DataFrame
joined_df= user_data_df.join(user_payment_df, on="username", how="inner")
joined_df.show()

In [0]:
# Drop null values
dropna_df = joined_df.dropna()
dropna_df.show()

In [0]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with active users
cleaned_df = dropna_df.filter(col("active_user")  == True)
cleaned_df.show()

In [0]:
# Create user dataframe to match active_user table
clean_user_df = cleaned_df.select(["id", "first_name", "last_name", "username"])
clean_user_df.show()

In [0]:
# Create user dataframe to match billing_info table
clean_billing_df = cleaned_df.select(["billing_id", "street_address", "state", "username"])
clean_billing_df.show()

In [0]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql:/<endpoint>:5432/my_data_class_db"
config = {"user":"root", 
          "password": "<password>", 
          "driver":"org.postgresql.Driver"}


In [0]:
# Append DataFrame to active_user table in RDS

clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [0]:
# Write dataframe to billing_info table in RDS

clean_billing_df.write.jdbc(url=jdbc_url, table='billing_info', mode=mode, properties=config)

In [0]:
# Write dataframe to payment_info table in RDS

clean_payment_df.write.jdbc(url=jdbc_url, table='payment_info', mode=mode, properties=config)