# Import Dependencies

In [1]:
import os

# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.88.152)] [W                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://arc

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-11-14 18:35:26--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2021-11-14 18:35:26 (6.06 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("dataset_1_ETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Load Amazon Data into Spark DataFrame

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

In [5]:
# set schema
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

schema = [StructField("marketplace", StringType(), True),
          StructField("customer_id", IntegerType(), True),
          StructField("review_id", StringType(), True),
          StructField("product_id", StringType(), True),
          StructField("product_parent", IntegerType(), True),
          StructField("product_title", StringType(), True),
          StructField("product_category", StringType(), True),
          StructField("star_rating", IntegerType(), True),
          StructField("helpful_votes", IntegerType(), True),
          StructField("total_votes", IntegerType(), True),
          StructField("vine", StringType(), True),
          StructField("verified_purchase", StringType(), True),
          StructField("review_headline", StringType(), True),
          StructField("review_body", StringType(), True),
          StructField("review_date", DateType(), True),]

final=StructType(fields=schema)

In [6]:
pet_products_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
pet_products_df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Set Tables for Import to Database

In [7]:
# show number of rows in dataset
pet_products_df.select("customer_id").count()

2643619

In [8]:
# find unique customers and count number of occurrences
customers_df = pet_products_df.select("customer_id").groupBy("customer_id").count().withColumnRenamed("count","customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   10270641|             1|
|   18365872|             1|
|   16711087|             1|
|   10742726|             2|
|   41169638|             1|
|   43622307|             1|
|   24540309|             2|
|   28258386|             1|
|   35329257|             2|
|   14552054|             1|
|   14529507|             5|
|   45392827|             5|
|   47282953|             1|
|    8201930|             1|
|   20109760|             2|
|   16405801|             4|
|   15056685|            21|
|   20840575|             2|
|   39048303|             1|
|    5596610|             1|
+-----------+--------------+
only showing top 20 rows



In [9]:
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



In [10]:
# find unique products and their titles
from pyspark.sql import Row
products_df = pet_products_df.dropDuplicates((["product_id"])).select("product_id", "product_title")

products_df.show(truncate=False)

+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id|product_title                                                                                                                                                                    |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0764102885|Saint Bernards (Barron's Complete Pet Owner's Manuals (Paperback))                                                                                                               |
|B00006OAM1|Petstep 560 PetSTEP Flotation Kit                                                                                                                                                |
|B000084E44|Omega Paw Self-Cleaning Litter Bo

In [11]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [12]:
# find info specified for review table
review_id_table_df = pet_products_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")
review_id_table_df.show(truncate=False)

+--------------+-----------+----------+--------------+-----------+
|review_id     |customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|REAKC26P07MDN |28794885   |B00Q0K9604|510387886     |2015-08-31 |
|R3NU7OMZ4HQIEG|11488901   |B00MBW5O9W|912374672     |2015-08-31 |
|R14QJW3XF8QO1P|43214993   |B0084OHUIO|902215727     |2015-08-31 |
|R2HB7AX0394ZGY|12835065   |B001GS71K2|568880110     |2015-08-31 |
|RGKMPDQGSAHR3 |26334022   |B004ABH1LG|692846826     |2015-08-31 |
|R1DJCVPQGCV66E|22283621   |B00AX0LFM4|590674141     |2015-08-31 |
|R3V52EAWLPBFQG|14469895   |B00DQFZGZ0|688538603     |2015-08-31 |
|R3DKO8J1J28QBI|50896354   |B00DIRF9US|742358789     |2015-08-31 |
|R764DBXGRNECG |18440567   |B00JRCBFUG|869798483     |2015-08-31 |
|RW1853GAT0Z9F |50502362   |B000L3XYZ4|501118658     |2015-08-31 |
|R33GITXNUF1AD4|33930128   |B00BOEXWFG|454737777     |2015-08-31 |
|R1H7AVM81TAYRV|43534290   |B001HBBQKY|420905252     |2015-08-

In [13]:
review_id_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: string (nullable = true)



In [14]:
from pyspark.sql.functions import to_date
review_id_table_df = review_id_table_df.withColumnRenamed("review_date","date_str")
review_id_table_df = review_id_table_df.withColumn("review_date", to_date(review_id_table_df["date_str"])).select("review_id","customer_id","product_id","product_parent","review_date")
review_id_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [15]:
# find info specified for vine table
vine_table_df = pet_products_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
vine_table_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+
|review_id     |star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|REAKC26P07MDN |5          |0            |0          |N   |
|R3NU7OMZ4HQIEG|2          |0            |1          |N   |
|R14QJW3XF8QO1P|5          |0            |0          |N   |
|R2HB7AX0394ZGY|5          |0            |0          |N   |
|RGKMPDQGSAHR3 |5          |0            |0          |N   |
|R1DJCVPQGCV66E|5          |0            |0          |N   |
|R3V52EAWLPBFQG|3          |0            |0          |N   |
|R3DKO8J1J28QBI|2          |0            |0          |N   |
|R764DBXGRNECG |5          |1            |1          |N   |
|RW1853GAT0Z9F |5          |0            |0          |N   |
|R33GITXNUF1AD4|2          |0            |0          |N   |
|R1H7AVM81TAYRV|1          |2            |2          |N   |
|R2ZOYAQZNNZZWV|5          |0            |0          |N   |
|R2FN1H3CGW6J8H|1          |0           

In [16]:
vine_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



# Push to AWS RDS Instace

In [17]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cqdxhnrowhog.us-east-1.rds.amazonaws.com:5432/homework_level_1_db"
configuration = {"user": "root", 
          "password": "<password>", 
          "driver": "org.postgresql.Driver"}

In [18]:
# Write DataFrame to review_id_table table in RDS
review_id_table_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=configuration)

In [19]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=configuration)

In [20]:
# Write DataFrame to customers table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=configuration)

In [21]:
# Write DataFrame to vine_table table in RDS
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=configuration)