In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

In [2]:
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 1,581 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 http://archive.ubuntu.com/ubu

In [3]:
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

In [4]:
# Start a SparkSession
import findspark
findspark.init()

In [5]:
#connect to postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-05-31 21:52:40--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-05-31 21:52:40 (10.3 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



# LEVEL 1

In [6]:
#create a pyspark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HomeImprovementETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [7]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   48881148|R215C9BDXTDQOW|B00FR4YQYK|     381800308|SadoTech Model C ...|Home Improvement|          4|            0|          0|   N|                Y|          Four Stars|        good product|2015-01-31 00:08:00|
|         US|   47882936|R1DTPUV1J57YHA|B00439MYYE|     921341748|iSpring T32M 3.2 ...|H

In [8]:
#Count the number of records in the dataset
rowCount = df.count()
print(f"The number of rows is: {rowCount}")

The number of rows is: 2634781


In [9]:
#Transform the dataset to fit the tables in the schema file. Be sure the DataFrames match in data type and in column name.
#Check the current schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [10]:
#create the review ID dataframe
review_df = df.select(['review_id','customer_id','product_id','product_parent','review_date'])
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [11]:
#create the products dataframe
products_df = df.select(['product_id','product_title'])
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [18]:
#clean up duplicates
products_df = products_df.dropDuplicates(["product_id"])

In [12]:
#create the customers dataframe
customer_df = df.select(['customer_id'])
#since count is not in the original dataframe, use groupby to get counts
customer_df = customer_df.groupby('customer_id').count()
customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- count: long (nullable = false)



In [13]:
#Load the DataFrames that correspond to tables into an RDS instance. Note: This process can take up to 10 minutes for each. Be sure that everything is correct before uploading.
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresserver.cip4zzqc5usy.us-east-1.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"root", 
          "password": "Q2=w3e1r%t1", 
          "driver":"org.postgresql.Driver"}


In [14]:
#write review ID table
review_df.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)

In [19]:
#write products table
products_df.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)

In [21]:
#update customer_df to match headers (note, this shold be done when cleaning the data, but was not done here to prevent re-running the book and using AWS resources)
from pyspark.sql.functions import col
customer_df = customer_df.select(col("customer_id").alias("customer_id"), col("count").alias("customer_count"))

#write customers table
customer_df.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)

## Level 2

In [22]:
#create the vine table dataframe
vine_df = df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [24]:
#write to the vine table
vine_df.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)

In [25]:
#stop the spark session to clear up connections on AWS
spark.stop()