## Import Dependencies

In [None]:
# for general use
import os

# define spark parameters (version info can be found at http://www.apache.org/dist/spark/)
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION'] = spark_version

# install spark and java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# initialize spark
import findspark
findspark.init()

In [None]:
# install jdbc
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

## Import Raw Data

In [None]:
# instantiate the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# import dependency
from pyspark import SparkFiles

# locate the buckets
url_home_improvement = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"
url_tools = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Tools_v1_00.tsv.gz"

# add the buckets to spark
spark.sparkContext.addFile(url_home_improvement)
spark.sparkContext.addFile(url_tools)

# read spark files into dataframes
home_improvement_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"), sep = "\t", header = True)
tools_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Tools_v1_00.tsv.gz"), sep = "\t", header = True)

## Examine Data

In [None]:
# combine the two data frames
df = home_improvement_df.union(tools_df)

# store the row counts
home_improvement_row_count = home_improvement_df.count()
tools_row_count = tools_df.count()
row_count = df.count()

# check the row counts to ensure a proper union was executed
print(f"{home_improvement_row_count:,} + {tools_row_count:,} = {row_count:,}")
if home_improvement_row_count + tools_row_count == row_count:
    print("The row counts add up.")
else:
    print("The row counts don't add up.")

# preview the dataframe
df.show()

In [None]:
# count null, none, and nan for each of the raw dataframe columns
from pyspark.sql.functions import col, isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# clean the raw dataframe and store the results in a new dataframe
clean_df = df.dropna().distinct()

# check how many rows were removed
clean_row_count = clean_df.count()
print(f"There are {clean_row_count:,.0f} rows in the clean dataframe. A total of {row_count - clean_row_count:,.0f} rows were removed.")

# confirm the null/none/nan values are gone
clean_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in clean_df.columns]).show()

In [None]:
# check the datatypes for all columns
clean_df.printSchema()

In [None]:
# convert column data types to match the database schema
from pyspark.sql.functions import unix_timestamp, to_date
from pyspark.sql.types import IntegerType

# product_parent    str -> int
# star_rating       str -> int
# helpful_votes     str -> int
# total_votes       str -> int
# review_date       str -> date

# convert data types
clean_df = clean_df.withColumn("product_parent", clean_df.product_parent.cast(IntegerType()))
clean_df = clean_df.withColumn("star_rating", clean_df.star_rating.cast(IntegerType()))
clean_df = clean_df.withColumn("helpful_votes", clean_df.helpful_votes.cast(IntegerType()))
clean_df = clean_df.withColumn("total_votes", clean_df.total_votes.cast(IntegerType()))
clean_df = clean_df.withColumn("review_date", to_date(unix_timestamp(col("review_date"), "yyyy-MM-dd").cast("timestamp")))

final_df = clean_df.select(["review_id", "product_id", "product_parent", "product_title", "product_category", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase", "review_headline", "review_body", "review_date"])
final_df = final_df.dropDuplicates(["review_id"])

final_df.show()
final_df.printSchema()

## Load Data

In [None]:
# import the confidential Amazon AWS RDS information
from config import rds_endpoint, rds_password, rds_dbname, rds_username, rds_port

In [None]:
# connection string
jdbc_url = f"jdbc:postgresql://{rds_endpoint}:{rds_port}/{rds_dbname}"

# config parameters
config = {
    "user": f"{rds_username}",
    "password": f"{rds_password}",
    "driver": "org.postgresql.Driver"
}

my_mode = "append"

In [None]:
# write to the AWS RDS tables
final_df.write.jdbc(url = jdbc_url, table = "review_data", mode = my_mode, properties = config)