In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.1.1'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad

In [2]:
# For connection to Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-03-12 00:04:50--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.4’


2021-03-12 00:04:50 (5.57 MB/s) - ‘postgresql-42.2.9.jar.4’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets into a DataFrame
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"), sep="\t")

# Show DataFrame
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     179886| RY01SAV7HZ8QO|B00NTI0CQ2|     667358431|Aketek 1080P LED ...|Home Entertainment|          4|            0|          0|   N|                Y|good enough for m...|not the best pict...| 2015-08-31|
|         US|   37293769|R1XX8SDGJ4MZ4L|B00BUCLVZU|     621695622|TiVo Mini with IR...|Home Entertainment|      

In [5]:
# examine the schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [6]:
# import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, DateType, StructType

In [7]:
# create two lists of struct fields
schema1 = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), True), StructField("review_id", StringType(), False), StructField("product_id", StringType(), True), StructField("product_parent", IntegerType(), True), StructField("product_title", StringType(), True), StructField("product_category", StringType(), True), StructField("star_rating", IntegerType(), True), StructField("helpful_votes", IntegerType(), True), StructField("total_votes", IntegerType(), True), StructField("vine", StringType(), True), StructField("verified_purchase", StringType(), True), StructField("review_headline", StringType(), True), StructField("review_body", StringType(), True), StructField("review_date", DateType(), True)]
schema2 = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), False), StructField("review_id", StringType(), True), StructField("product_id", StringType(), False), StructField("product_parent", IntegerType(), True), StructField("product_title", StringType(), True), StructField("product_category", StringType(), True), StructField("star_rating", IntegerType(), True), StructField("helpful_votes", IntegerType(), True), StructField("total_votes", IntegerType(), True), StructField("vine", StringType(), True), StructField("verified_purchase", StringType(), True), StructField("review_headline", StringType(), True), StructField("review_body", StringType(), True), StructField("review_date", DateType(), True)]
print(schema1)
print(schema2)

[StructField(marketplace,StringType,true), StructField(customer_id,IntegerType,true), StructField(review_id,StringType,false), StructField(product_id,StringType,true), StructField(product_parent,IntegerType,true), StructField(product_title,StringType,true), StructField(product_category,StringType,true), StructField(star_rating,IntegerType,true), StructField(helpful_votes,IntegerType,true), StructField(total_votes,IntegerType,true), StructField(vine,StringType,true), StructField(verified_purchase,StringType,true), StructField(review_headline,StringType,true), StructField(review_body,StringType,true), StructField(review_date,DateType,true)]
[StructField(marketplace,StringType,true), StructField(customer_id,IntegerType,false), StructField(review_id,StringType,true), StructField(product_id,StringType,false), StructField(product_parent,IntegerType,true), StructField(product_title,StringType,true), StructField(product_category,StringType,true), StructField(star_rating,IntegerType,true), Stru

In [8]:
# pass in our fields
final1 = StructType(fields=schema1)
final2 = StructType(fields=schema2)
print(final1)
print(final2)

StructType(List(StructField(marketplace,StringType,true),StructField(customer_id,IntegerType,true),StructField(review_id,StringType,false),StructField(product_id,StringType,true),StructField(product_parent,IntegerType,true),StructField(product_title,StringType,true),StructField(product_category,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(review_date,DateType,true)))
StructType(List(StructField(marketplace,StringType,true),StructField(customer_id,IntegerType,false),StructField(review_id,StringType,true),StructField(product_id,StringType,false),StructField(product_parent,IntegerType,true),StructField(product_title,StringType,true),StructField(product_category,StringType,true),StructField(star_rating,IntegerType,t

In [9]:
# read our data with our new schema
df1 = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"), schema=final1, sep="\t")
df2 = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"), schema=final2, sep="\t")

In [10]:
# Show DataFrame 1
df1.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     179886| RY01SAV7HZ8QO|B00NTI0CQ2|     667358431|Aketek 1080P LED ...|Home Entertainment|          4|            0|          0|   N|                Y|good enough for m...|not the best pict...| 2015-08-31|
|         US|   37293769|R1XX8SDGJ4MZ4L|B00BUCLVZU|     621695622|TiVo Mini with IR...|Home Entertainment|      

In [11]:
# Show DataFrame 2
df2.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     179886| RY01SAV7HZ8QO|B00NTI0CQ2|     667358431|Aketek 1080P LED ...|Home Entertainment|          4|            0|          0|   N|                Y|good enough for m...|not the best pict...| 2015-08-31|
|         US|   37293769|R1XX8SDGJ4MZ4L|B00BUCLVZU|     621695622|TiVo Mini with IR...|Home Entertainment|      

In [12]:
# examine the new schema; I believe these results are an error: all nullable attributes are always true, despite definition in the static schema and having been correctly interpreted.
df1.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [13]:
# count the number of records (rows) in the dataset
print(df1.count())
print(df2.count())

705889
705889


In [14]:
# drop incomplete rows
df1 = df1.dropna()
df2 = df2.dropna()
print(df1.count())
print(df2.count())

705850
705850
705850
705850


In [15]:
# transform the dataset to fit the tables in the schema file
# 1 use df1 to create a DataFrame to match 'review_id_table' table
review_id_df = df1.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R101H086VRVWKX|   30823406|B00882OHL0|     426839100| 2014-07-20|
|R103ELYARCJ7AW|   11418164|B006ZO8HCS|     404339813| 2012-07-10|
|R105L6OCRSI8Q4|   43557302|B00D79AQ14|     526333387| 2014-09-04|
|R1062NR7LOXBOT|   18188055|B008MWLHX2|      65474474| 2013-03-01|
|R106SYLD7Y8UR6|   25746561|B001EZRJZE|     565314198| 2009-06-07|
|R106VR2RW71DLJ|   52845988|B00GKKI4IE|     326391602| 2015-03-22|
|R107BMMP3MA4XK|   45108921|B00I2ZBD1U|     384314603| 2014-09-30|
|R107HT8VDOS3R5|   17164769|B001418W2C|     332919308| 2008-05-30|
|R107JF6ASZ9B7D|   16071390|B00DR0PDNE|     343185803| 2014-08-31|
|R108223RUEXHXM|   30923511|B003CHOFKY|     322302573| 2013-01-22|
|R108MFG2YLCFFL|    2956912|B005TI1ILS|     375830131| 2014-12-10|
|R10AR9UYN0UTFI|   34710166|B00BFDHVAS|     293226285| 2013-06

In [19]:
# 2 use df2 to create a DataFrame to match 'products' table
products_df = df2.select(["product_id", "product_title"]).distinct()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00BBAG0DY|LG Electronics LA...|
|B000B63RHG|Sharp LC-37D6U Aq...|
|B008I641TE|SquareTrade 2-Yea...|
|B006L8TX94|TiVo Premiere 500...|
|B007R9RUPU|Kinivo LS210 Port...|
|B0001YHY0O|Toshiba 20AF44 20...|
|B002NNNZEA|SquareTrade 3-Yea...|
|B007F9XJW0|Sony DVPFX780 7-I...|
|B00DILTY0U|Tianle TL869 Andr...|
|B00LGJ5JKA|Excelvan 2800 Lum...|
|B00N9OT6RM|Upstar 19-Inch 72...|
|B00DCB9BEK|Fosmon Clear Scre...|
|B004NPND20|Panasonic VIERA 1...|
|B009TQNZV2|WAP3205 v2 - Drah...|
|B007JMNJFK|Samsung HW-E 2.1 ...|
|B003CCW4OS|FAVI 42-Inch 1080...|
|B000N237T6|Pioneer DV-400V H...|
|B00CIR3YJC|Patuoxun® Backup ...|
|B004O3B4MC|HQRP Wall Travel ...|
|B00BXF7I8I|Seiki 1080p 60Hz ...|
+----------+--------------------+
only showing top 20 rows



In [22]:
df.filter(df["product_id"] == "B003CCW4OS").select(["product_id", "product_title"]).show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B003CCW4OS|FAVI 42-Inch 1080...|
|B003CCW4OS|FAVI 42-Inch 1080...|
|B003CCW4OS|FAVI 42-Inch 1080...|
|B003CCW4OS|FAVI 42-Inch 1080...|
|B003CCW4OS|FAVI 42-Inch 1080...|
+----------+--------------------+



In [24]:
df2.filter(df2["review_date"] == "2015-08-31").show()

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   41392448|R18FHVYXC48KX0|B007ZPY6WQ|      51859103|Video Sunglasses ...|Home Entertainment|          3|            0|          0|   N|                Y|         Three Stars|no chip provided ...| 2015-08-31|
|         US|   37945800|R1BQZI1JSCA4GX|B00QHLSKOE|     885228855|Matricom G-Box Q²...|Home Entertainment|      

In [25]:
# 3 use df2 to create create a DataFrame to match 'customers' table
customers_df = df2.groupBy("customer_id").count().select("customer_id", "count")
customers_df.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|    1999776|    1|
|   16152581|    3|
|   36934735|    1|
|   10854194|    2|
|   30355550|    1|
|   48415532|    1|
|   25475014|    4|
|   50580392|    1|
|   47897118|    1|
|   27532661|    1|
|   12568829|    4|
|   20117488|    1|
|     649345|    1|
|   49347075|    1|
|   52600605|    4|
|   10308272|    1|
|   22421107|    1|
|   10453684|    1|
|   15274896|    1|
|   19278656|    1|
+-----------+-----+
only showing top 20 rows



In [26]:
# 4 use df2 to create create a DataFrame to match 'vine_table' table
vine_df = df2.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R101H086VRVWKX|          5|            0|          0|   N|
|R103ELYARCJ7AW|          1|            0|          0|   N|
|R105L6OCRSI8Q4|          5|            0|          0|   N|
|R1062NR7LOXBOT|          5|            2|          4|   N|
|R106SYLD7Y8UR6|          5|            0|          3|   N|
|R106VR2RW71DLJ|          5|            3|          3|   N|
|R107BMMP3MA4XK|          2|            0|          2|   N|
|R107HT8VDOS3R5|          5|            0|          0|   N|
|R107JF6ASZ9B7D|          4|            0|          0|   N|
|R108223RUEXHXM|          5|            0|          0|   N|
|R108MFG2YLCFFL|          5|            0|          0|   N|
|R10AR9UYN0UTFI|          2|            6|          9|   N|
|R10CMZIVS1WMDC|          3|            4|          7|   N|
|R10D4O96VXQF3J|          5|            

In [None]:
# load the DataFrames that correspond to tables into an RDS instance