In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net                                                                               Hit:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net                                                                               Hit:5 http://archive.ubuntu.com/

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-03-04 02:47:04--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.5’


2023-03-04 02:47:04 (10.1 MB/s) - ‘postgresql-42.2.9.jar.5’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("bigdataetl-db").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# **Extract the Amazon Data into Spark DataFrame**

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

music = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Music_v1_00.tsv.gz"), sep='\t', header=True, inferSchema=True)
music.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   10140119|R3LI5TRP3YIDQL|B00TXH4OLC|     384427924|Whatever's for Us...|           Music|          5|            0|          0|   N|                Y|          Five Stars|Love this CD alon...| 2015-08-31|
|         US|   27664622|R3LGC3EKEG84PX|B00B6QXN6U|     831769051|Same Trailer Diff...|           Music|          5|    

In [5]:
music.count()

4751577

In [6]:
# Drop null values
music = music.dropna()
print(music.count())

4750852


# **Transform the Data**

In [7]:
# Print schema
music.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [8]:
# Changing datatypes
# Source Link: https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark

from pyspark.sql.types import TimestampType, IntegerType

music = music.withColumn("customer_id", music["customer_id"].cast(IntegerType()))\
       .withColumn("product_parent", music["product_parent"].cast(IntegerType()))\
       .withColumn("review_date", music["review_date"].cast(TimestampType()))\
       .withColumn("star_rating", music["star_rating"].cast(IntegerType()))\
       .withColumn("helpful_votes", music["helpful_votes"].cast(IntegerType()))\
      .withColumn("total_votes", music["total_votes"].cast(IntegerType()))\

# Print schema
music.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



### **Create Review ID Table**

In [9]:
review_id_df = music.select(["review_id","customer_id","product_id", "product_parent","review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R3LI5TRP3YIDQL|   10140119|B00TXH4OLC|     384427924|2015-08-31 00:00:00|
|R3LGC3EKEG84PX|   27664622|B00B6QXN6U|     831769051|2015-08-31 00:00:00|
| R9PYL3OYH55QY|   45946560|B001GCZXW6|      14067376|2015-08-31 00:00:00|
|R3PWBAWUS4NT0Q|   15146326|B000003EK6|     566295619|2015-08-31 00:00:00|
|R15LYP3O51UU9E|   16794688|B00N1F0BKK|     210426072|2015-08-31 00:00:00|
|R1AD7L0CC3DSRI|   32203364|B00V7KAO7Q|     360249504|2015-08-31 00:00:00|
|R32FE8Y45QV434|    1194276|B000094Q4P|      20972283|2015-08-31 00:00:00|
|R3NM4MZ4XWL43Q|   45813052|B00JMK0P1I|     378049560|2015-08-31 00:00:00|
|R3H4FXX6Q7I37D|   12795687|B008OW1S3O|     141620702|2015-08-31 00:00:00|
|R30L5PET7LFFDC|   36673840|B00VI2L3L4|     920557135|2015-08-31 00:00:00|
|  REFRE1LEKLAF|   494535

### **Create Products Table**

In [10]:
products_df = music.select(["product_id", "product_title"]).dropDuplicates(["product_id", "product_title"])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B001FOJ76S|         The Promise|
|B000NJWSE4|                Free|
|B00002CF4Z|The Best of Steve...|
|B000CQM4T2|        Live Trucker|
|B000002QG8|            Float On|
|B003JYOFIW|Something for Eve...|
|B004MY64NM|             Manhole|
|B007I8TCD8|           Do Things|
|B001CY2ELQ|               Lenka|
|B00BQ1DBH6|The Best From The...|
|B00VXGTJMU|   Django and Jimmie|
|B00MI9OZ0W|BAYONETTA 2 Origi...|
|B003DC881A|   Chamberlain Waits|
|B000003MZG|            Africano|
|B00005QIUW|Christmas Through...|
|B000WSTAEC|Haendel: La Passi...|
|B002PXJZIS|Dont Stop Believi...|
|B00U646N7E|Directions Home (...|
|B0007MR1L2|Hummel: Septet, O...|
|B00AK77XUS|     The Flower Lane|
+----------+--------------------+
only showing top 20 rows



### **Create Customers Table**

In [11]:
from pyspark.sql.functions import desc

customers_df = music.groupby("customer_id").agg({"customer_id" : "count"})
customers_df = customers_df.orderBy(desc("count(customer_id)"))
customers_df = customers_df.withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   50736950|          7168|
|   38214553|          5411|
|   51184997|          5369|
|   18116317|          4220|
|   23267387|          4023|
|   50345651|          3792|
|   14539589|          2896|
|   15725862|          2842|
|   19380211|          2592|
|   20018062|          2568|
|   51381678|          2457|
|   37455882|          2370|
|   47924228|          2352|
|   50441674|          2282|
|   29791894|          2246|
|   22073263|          2213|
|   34639163|          2179|
|   25527589|          2127|
|   47423754|          2120|
|   50503261|          2119|
+-----------+--------------+
only showing top 20 rows



### **Create Vine Table**

In [12]:
vine_df = music.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3LI5TRP3YIDQL|          5|            0|          0|   N|
|R3LGC3EKEG84PX|          5|            0|          0|   N|
| R9PYL3OYH55QY|          5|            0|          1|   N|
|R3PWBAWUS4NT0Q|          3|            0|          0|   N|
|R15LYP3O51UU9E|          5|            0|          0|   N|
|R1AD7L0CC3DSRI|          5|            0|          0|   N|
|R32FE8Y45QV434|          5|            0|          0|   N|
|R3NM4MZ4XWL43Q|          5|            1|          2|   N|
|R3H4FXX6Q7I37D|          4|            0|          0|   N|
|R30L5PET7LFFDC|          5|            1|          1|   N|
|  REFRE1LEKLAF|          4|            1|          1|   N|
|R3JTJ5EQN74E9H|          5|            0|          0|   N|
|R1W2F091LCOAW5|          5|            0|          0|   N|
| RYUMFQRRB1FNM|          3|            

# **Load the DataFrame To RDS**

In [13]:
mode = "append"
jdbc_url="jdbc:postgresql://bigdataetl-db.claqzv8ic3l5.us-east-1.rds.amazonaws.com:5432/big_data2_etl_db"
config = {"user":"root", "password": "postgres", "driver":"org.postgresql.Driver"}

In [14]:

# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [15]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [16]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [17]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)