In [19]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:5 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.n

In [20]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar


--2022-10-09 18:35:58--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-10-09 18:35:58 (5.00 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [22]:
from pyspark import SparkFiles
# Load in the selected csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Jewelry_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

# Creating the dataframe
jewelry_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Jewelry_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy", header =True)
jewelry_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   50423057|R135Q3VZ4DQN5N|B00JWXFDMG|     657335467|Everbling Purple ...|         Jewelry|          5|            0|          0|   N|                Y|           Beauties!|so beautiful even...| 2015-08-31|
|         US|   11262325|R2N0QQ6R4T7YRY|B00W5T1H9W|      26030170|925 Sterling Silv...|         Jewelry|          5|    

In [23]:
# Count of dataframe records
print(jewelry_df.count())

1767753


In [24]:
# Dropping the NA ones and displaying new record count
jewelry_df = jewelry_df.dropna()
print(jewelry_df.count())

1767394


In [25]:
# Dropping the duplicate ones and displaying new record count
jewelry_df = jewelry_df.dropDuplicates()
print(jewelry_df.count())

1767394


In [26]:
# Schema
jewelry_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [27]:
# Selecting the required columns for the review dataframe and displaying the first 10 records
review_id_df = jewelry_df.select(['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date'])
review_id_df.show(10)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1000SHT04I05Z|   10989386|B00BC9ZCFQ|     412268594| 2013-08-04|
|R100L48N7NQJOM|   18778022|B0098Z105W|     498518502| 2014-09-29|
|R100W5CE099235|   23281221|B00G4EKH86|     258430791| 2014-05-23|
|R101L2Z28VA8ZM|   21148835|B00O96VYY2|     736039112| 2014-12-25|
|R101NZ4EWORURU|   45435568|B00C1G4LME|     345456390| 2013-09-26|
|R103I4TR9XD1VU|   15934715|B0040UMTDW|     913517322| 2012-01-01|
|R10451EZEGTTJ4|   18019354|B008SOTSEY|     707188081| 2014-10-23|
|R10454XRBFGGLU|   23200310|B003XYMXLY|     665660556| 2015-02-19|
|R1046FPO0U5JQZ|    6629323|B00VKNYBR4|     614133040| 2015-07-23|
|R104Q26FWKPR4N|   39979029|B003XT5ZZK|     290581820| 2014-05-02|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [28]:
# Using different data types required for the Review dataframe
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
review_id_df = review_id_df.withColumn("review_id",col("review_id").cast(StringType())) \
    .withColumn("customer_id",col("customer_id").cast(IntegerType())) \
    .withColumn("product_id",col("product_id").cast(StringType())) \
    .withColumn("product_parent",col("product_parent").cast(IntegerType())) \
    .withColumn("review_date",col("review_date").cast(DateType()))

review_id_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [29]:
# Creating records for products dataframe
products_df = jewelry_df.select(['product_id', 'product_title'])
products_df.dropDuplicates(['product_id'])
products_df.show(10)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00BC9ZCFQ|Sterling Silver S...|
|B0098Z105W|925 Sterling Silv...|
|B00G4EKH86|14k Yellow Gold H...|
|B00O96VYY2|Angel Silver Nugz...|
|B00C1G4LME|Heirloom Finds Bl...|
|B0040UMTDW|Blue White .925 S...|
|B008SOTSEY|Ear Gauges 00G - ...|
|B003XYMXLY|Valentines Sterli...|
|B00VKNYBR4|Flower Petals 5/1...|
|B003XT5ZZK|Sterling Silver D...|
+----------+--------------------+
only showing top 10 rows



In [30]:
products_df = products_df.withColumn("product_id",col("product_id").cast(StringType())) \
    .withColumn("product_title",col("product_title").cast(StringType()))

products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [31]:
customers_df = jewelry_df.select(['customer_id'])
customers_df.show(10)

+-----------+
|customer_id|
+-----------+
|   10989386|
|   18778022|
|   23281221|
|   21148835|
|   45435568|
|   15934715|
|   18019354|
|   23200310|
|    6629323|
|   39979029|
+-----------+
only showing top 10 rows



In [32]:
# Counting records for specific customer IDs 
customers_df = customers_df.groupby('customer_id').count()
customers_df.show(10)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   14542376|   11|
|     195263|    2|
|     726270|    1|
|   43776283|    1|
|    2074030|    8|
|   18503260|    2|
|   46017005|    1|
|   52807885|    1|
|   47816064|    1|
|   43156541|    1|
+-----------+-----+
only showing top 10 rows



In [33]:
customers_df = customers_df.withColumnRenamed('count', 'customer_count')
customers_df.show(10)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   14542376|            11|
|     195263|             2|
|     726270|             1|
|   43776283|             1|
|    2074030|             8|
|   18503260|             2|
|   46017005|             1|
|   52807885|             1|
|   47816064|             1|
|   43156541|             1|
+-----------+--------------+
only showing top 10 rows



In [34]:
customers_df = customers_df.withColumn("customer_id",col("customer_id").cast(IntegerType())) \
    .withColumn("customer_count",col("customer_count").cast(IntegerType()))

customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [35]:
# Creating the vine dataframe with the required fields
vine_table_df = jewelry_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_table_df.show(10)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1000SHT04I05Z|          4|            1|          1|   N|
|R100L48N7NQJOM|          5|            0|          0|   N|
|R100W5CE099235|          1|            3|          3|   N|
|R101L2Z28VA8ZM|          5|            0|          0|   N|
|R101NZ4EWORURU|          5|            0|          0|   N|
|R103I4TR9XD1VU|          4|            0|          1|   N|
|R10451EZEGTTJ4|          5|            0|          0|   N|
|R10454XRBFGGLU|          4|            0|          0|   N|
|R1046FPO0U5JQZ|          4|            1|          1|   N|
|R104Q26FWKPR4N|          5|            2|          3|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 10 rows



In [36]:
vine_table_df = vine_table_df.withColumn("review_id",col("review_id").cast(StringType())) \
    .withColumn("star_rating",col("star_rating").cast(IntegerType())) \
    .withColumn("helpful_votes",col("helpful_votes").cast(IntegerType())) \
    .withColumn("total_votes",col("total_votes").cast(IntegerType())) \
    .withColumn("vine",col("vine").cast(StringType()))

vine_table_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [37]:
# RDS configuration - as required, changed the values, so this will fail now when I run it. 
mode = "append"
jdbc_url="jdbc:postgresql://my-amazon-endpoint:5432/postgres"
config = {"user":"postgres", 
          "password": "of-course-this-is-not-my-real-password", 
          "driver":"org.postgresql.Driver"} 

In [38]:
# JDBC write tables (won't work as connection above will fail with the dummy connection details)
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
products_table_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)
customer_table_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)  

Py4JJavaError: ignored