In [18]:
import os

spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:9 https://develo

In [19]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar


--2022-09-29 19:01:03--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2022-09-29 19:01:04 (4.98 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [20]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [21]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Beauty_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

beauty_df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Beauty_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
beauty_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+---------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|          review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+---------------------+-------------------+
|         US|    1797882|R3I2DHQBR577SS|B001ANOOOE|       2102612|The Naked Bee Vit...|          Beauty|          5|            0|          0|   N|                Y|          Five Stars| Love this, excell...|2015-01-31 00:08:00|
|         US|   18381298|R1QNE9NQFJC2Y4|B0016J22EQ|     106393691|Alba Botanica Sun.

In [22]:
# Count number of records
beauty_df.count()

5115666

In [23]:
# Drop null values and count new total records
beauty_df = df.dropna()
beauty_df.count()

5114733

In [24]:
# Print schema
beauty_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [25]:
# Changing datatypes

from pyspark.sql.types import IntegerType

beauty_df = df.withColumn("star_rating", df["star_rating"].cast(IntegerType()))\


# Print schema
beauty_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [26]:
# Create review dataframe to match review_id_table table
review_id_df = df.select(["review_id","customer_id","product_id", "product_parent","review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R3I2DHQBR577SS|    1797882|B001ANOOOE|       2102612|2015-01-31 00:08:00|
|R1QNE9NQFJC2Y4|   18381298|B0016J22EQ|     106393691|2015-01-31 00:08:00|
|R3LIDG2Q4LJBAO|   19242472|B00HU6UQAG|     375449471|2015-01-31 00:08:00|
|R3KSZHPAEVPEAL|   19551372|B002HWS7RM|     255651889|2015-01-31 00:08:00|
| RAI2OIG50KZ43|   14802407|B00SM99KWU|     116158747|2015-01-31 00:08:00|
|R1R30FA4RB5P54|    2909389|B000NYL1Z6|     166146615|2015-01-31 00:08:00|
|R30IJKCGJBGPJH|   19397215|B001SYWTFG|     111742328|2015-01-31 00:08:00|
|R18GLJJPVQ1OVH|    3195210|B005F2EVMQ|     255803087|2015-01-31 00:08:00|
| R8TVYIJXLYJT0|   52216383|B00M1SUW7K|     246816549|2015-01-31 00:08:00|
|R1CJGF6M3PVHEZ|   10278216|B001KYQA1S|       9612905|2015-01-31 00:08:00|
|R2A30ALEGLMCGN|   246554

In [27]:
# Create product dataframe to match products table(only unique values)
from pyspark.sql.functions import asc
products_df = df.select(["product_id","product_title"])
products_df = products_df.dropDuplicates()
products_df = products_df.orderBy(asc("product_id"))
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|000004458X|Old Spice Shave C...|
|0293000050|Salvatore Ferraga...|
|0451192362|how stella got he...|
|0539764965|Fair & Lovely Mul...|
|054501218X|Nu Skin Tru Face ...|
|0558925278|Eco Friendly Ecot...|
|0594169909|Pink Wide Stripe ...|
|0702001996|  Mastiha Hand Cream|
|0733001998| Mastiha Body Lotion|
|0737104473|Hello Kitty Lustr...|
|0765075180|Yves Saint Lauren...|
|0782418104|NYX Cosmetics Lon...|
|078247425X|3 L'occitane Divi...|
|0785374426|Charlotte Russe R...|
|0788861654|Scrubs: The Compl...|
|0925480053|10 Minute Superch...|
|0977217213|Water Blessing La...|
|0982629281|INSPIRE Hair Fash...|
|0982633025|Refillable Powder...|
|1019655070|Schwarzkopf BC Bo...|
+----------+--------------------+
only showing top 20 rows



In [28]:
# Create customer dataframe to match customers table
from pyspark.sql.functions import desc

customers_df = df.groupby("customer_id").agg({"customer_id":"count"})
customers_df = customers_df.orderBy(desc("count(customer_id)"))
customers_df = customers_df.withColumnRenamed("count(customer_id)", "customer_count") 
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   36771761|           871|
|   32405532|           721|
|   10942711|           685|
|   37446839|           587|
|   50199793|           564|
|   12201275|           519|
|    4808156|           508|
|   48233483|           440|
|   52520442|           378|
|   39789300|           346|
|   18609243|           343|
|   37337835|           337|
|   42799904|           314|
|   21012418|           288|
|   10592389|           284|
|   52433525|           250|
|   51126995|           245|
|   52824002|           235|
|   18715781|           230|
|   31120312|           219|
+-----------+--------------+
only showing top 20 rows



In [29]:
# Create vine dataframe to match vine_table table

from pyspark.sql.functions import col

vine_table_df = df.select(["review_id","star_rating","helpful_votes", "total_votes","vine"])

# # Filter for only columns from Amazon's Vine program
# vine_table_df = vine_table_df.filter(col("vine")  == "Y")
vine_table_df.show() 

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3I2DHQBR577SS|          5|            0|          0|   N|
|R1QNE9NQFJC2Y4|          5|            0|          0|   N|
|R3LIDG2Q4LJBAO|          5|            0|          0|   N|
|R3KSZHPAEVPEAL|          5|            0|          0|   N|
| RAI2OIG50KZ43|          5|            0|          0|   N|
|R1R30FA4RB5P54|          4|            0|          0|   N|
|R30IJKCGJBGPJH|          5|            0|          0|   N|
|R18GLJJPVQ1OVH|          5|            0|          0|   N|
| R8TVYIJXLYJT0|          5|            0|          0|   N|
|R1CJGF6M3PVHEZ|          1|            0|          2|   N|
|R2A30ALEGLMCGN|          4|            1|          1|   N|
|R134GP1Y3EQZ1L|          5|            0|          0|   N|
| R6CE3SOIUJGP4|          4|            1|          1|   N|
|R37CDLQZUPDZVM|          5|            

In [33]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://database-1.c1lahiassvlb.us-west-1.rds.amazonaws.com:5432/beauty_reviews"
config = {"user":"postgres",
          "password": "March594!",
          "driver":"org.postgresql.Driver"}

In [35]:
# Write DataFrame to review_id_df table in RDS

review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [None]:
# Write DataFrame to products table in RDS

products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write DataFrame to customers table in RDS

customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write DataFrame to vine_table table in RDS

vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)