In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-11-22 01:34:51--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-11-22 01:34:51 (6.01 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [7]:
from pyspark import SparkFiles
# Load in Apparel data from s3.amazonaws.com into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Apparel_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

apparel_df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Apparel_v1_00.tsv.gz"), inferSchema=True, sep="\t", timestampFormat="mm/dd/yy")
apparel_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|Easy Tool Stainle...|         Apparel|          4|            0|          0|   N|                Y|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|V28 Women Cowl Ne...|         Apparel|          5|    

In [8]:
#count rows
print(apparel_df.count())


5906333


In [9]:
#drop incomplete rows
apparel_df = apparel_df.dropna()
print(apparel_df.count())

5905269


In [10]:
#drop duplicates
apparel_df = apparel_df.dropDuplicates()
#show total counts of Apparel reviews data after dropping duplicates and incomplete rows
print(apparel_df.count())


5905269


In [11]:
apparel_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



Transform DataFrame to Table Schema

In [19]:
# Converting review_date column from string to date 
from pyspark.sql.functions import to_date
 
apparel_df1 = apparel_df.withColumn('review_date',to_date(apparel_df.review_date, 'yyyy-MM-dd'))
apparel_df1.printSchema()
apparel_df1.select("review_date").dtypes

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



[('review_date', 'date')]

In [24]:
review_id_table = apparel_df1.select(["review_id","customer_id","product_id","product_parent", "review_date"])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1000LG8H70CNQ|     696306|B00O3A0RHY|     246430568| 2015-01-25|
|R100BCEUAIHZQC|   11807812|B00CASTERA|     600575877| 2014-06-03|
|R100EOB2GC5UYQ|   15017601|B002MLE36W|     988639002| 2014-10-26|
|R100MYSZIXWX1D|   44291413|B007HSEDVK|        174345| 2012-10-31|
|R100UMGHMFVXKC|   24539177|B00F52CUCE|     162326772| 2014-03-18|
|R101CJDLDDM19P|   10041263|B00NIX31K2|     559380483| 2014-12-16|
|R101OE6ZEL1A52|   20521864|B00CB1S40E|     248821115| 2014-02-08|
| R101PL938YJV0|   44457530|B00AF7D7VC|     656221590| 2013-11-14|
|R101QMSD2TQTAP|   14103889|B00N9KGZWA|     906968721| 2015-04-30|
|R101V43VD0FQ5Y|   23446176|B007WPF25E|     619910868| 2013-07-01|
|R102HAEXYCF1YM|   20169548|B00DECZ2KE|     913386738| 2014-02-09|
|R102KCQDXFRN3E|   18522476|B000092MPB|     673568662| 2006-06

In [25]:
products = apparel_df1.select(["product_id","product_title"])
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00O3A0RHY|Giant Panda Face ...|
|B00CASTERA|  Rothco Jungle Hats|
|B002MLE36W|Leg Avenue Women'...|
|B007HSEDVK|LAST KISS Ombré W...|
|B00F52CUCE|Magiftbox Men's S...|
|B00NIX31K2|ROEWELL® 4 Sets o...|
|B00CB1S40E|Valco Baby Snap4 ...|
|B00AF7D7VC|Louis Raphael ROS...|
|B00N9KGZWA|Women`s Push Up T...|
|B007WPF25E|Kangol Men's Josh...|
|B00DECZ2KE|London Times Wome...|
|B000092MPB|      Zip-front suit|
|B00CFET4Y2|Kanu Surf Men's C...|
|B00O3MJRRS|Modern Minute - N...|
|B003N63ET2|Champion Women's ...|
|B00IQ72SVS|Lg Dk. Green Semi...|
|B004BLAC0M|World's Softest M...|
|B00EY9QS9U|Zeagoo Women's Ch...|
|B00CBKJN64|The Beatles - Gre...|
|B00MUGCZA0|Inktastic - Red F...|
+----------+--------------------+
only showing top 20 rows



In [33]:
customers1 = apparel_df1.select(["customer_id"]).groupby("customer_id").agg({"customer_id":"count"})
customers1.show()

+-----------+------------------+
|customer_id|count(customer_id)|
+-----------+------------------+
|   12955905|                 5|
|    3909801|                 1|
|   17063052|                 1|
|   13245107|                 1|
|   15149043|                 5|
|    5137716|                 1|
|   42673458|                23|
|   15476378|                 1|
|   11615457|                 1|
|   52209445|                 2|
|   47277612|                18|
|     481101|                 1|
|    3207957|                 2|
|   20874267|                 3|
|   21580335|                 2|
|    4843304|                26|
|   36715887|                 2|
|    4416285|                18|
|   31663336|                 7|
|   15296593|                 2|
+-----------+------------------+
only showing top 20 rows



In [35]:
customers = customers1.withColumnRenamed("count(customer_id)", "customer_count")
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   12955905|             5|
|    3909801|             1|
|   17063052|             1|
|   13245107|             1|
|   15149043|             5|
|    5137716|             1|
|   42673458|            23|
|   15476378|             1|
|   11615457|             1|
|   52209445|             2|
|   47277612|            18|
|     481101|             1|
|    3207957|             2|
|   20874267|             3|
|   21580335|             2|
|    4843304|            26|
|   36715887|             2|
|    4416285|            18|
|   31663336|             7|
|   15296593|             2|
+-----------+--------------+
only showing top 20 rows



In [37]:
vine_table = apparel_df1.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1000LG8H70CNQ|          5|            1|          1|   N|
|R100BCEUAIHZQC|          5|            0|          0|   N|
|R100EOB2GC5UYQ|          4|            0|          0|   N|
|R100MYSZIXWX1D|          2|            0|          0|   N|
|R100UMGHMFVXKC|          5|            0|          1|   N|
|R101CJDLDDM19P|          5|            0|          1|   N|
|R101OE6ZEL1A52|          5|            0|          1|   N|
| R101PL938YJV0|          5|            0|          0|   N|
|R101QMSD2TQTAP|          5|            0|          1|   N|
|R101V43VD0FQ5Y|          4|            1|          1|   N|
|R102HAEXYCF1YM|          5|            0|          0|   N|
|R102KCQDXFRN3E|          2|            1|          1|   N|
|R102QMA01NZA6Z|          5|            0|          0|   N|
|R102S2K1PLWHI0|          4|            

Write DataFrame to RDS

In [56]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://<rds endpoint>:5432/databasename"
config = {"user":"root",
          "password": "password",
          "driver":"org.postgresql.Driver"}

In [57]:
# Write DataFrame to table

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [58]:
# Write DataFrame to table

products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [59]:
# Write DataFrame to table

customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [60]:
# Write DataFrame to table

vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)