In [None]:
import os

# latest spark version: 3.3.1
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-16 16:49:20--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-16 16:49:20 (5.44 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles

# load in the file we wanna read
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

# create the DataFrame
videos_DF = spark.read.option('header','true').csv(SparkFiles.get("amazon_reviews_us_Video_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy/mm/dd")
videos_DF.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   49033728|R1P1G5KZ05H6RD|6302503213|     748506413|The Night They Sa...|           Video|          5|            0|          0|   N|                Y|    Very satisfied!!|Fast shipping. Pl...| 2015-08-31|
|         US|   17857748|R106N066IUN8ZV|B000059PET|     478710180|Hamlet / Kline, N...|           Video|          5|    

* Complete the following steps for each notebook (one dataset per notebook).

  * Count the number of records (rows) in the dataset.

  * Transform the dataset to fit the tables in the [schema file](../Resources/schema.sql). Be sure that the DataFrames match in data type and in column name.

  * Load the DataFrames that correspond to tables into an RDS instance. **Note:** This process can take up to 10 minutes for each. Ensure that everything is correct before uploading.


In [None]:
# number of records
print(f"Number of records in this dataset: {videos_DF.count()}")

Number of records in this dataset: 380604


In [None]:
# transform dataset to fit the tables

# print schema
videos_DF.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [None]:
# Drop null values
videos_DF = videos_DF.dropna()

print(f"Number of rows after null values were removed: {videos_DF.count()}")

Number of rows after null values were removed: 380575


In [None]:
# change the datatypes
from pyspark.sql.types import TimestampType, IntegerType

videos_DF = videos_DF.withColumn("customer_id", videos_DF["customer_id"].cast(IntegerType()))\
.withColumn("product_parent", videos_DF["product_parent"].cast(IntegerType()))\
.withColumn("review_date", videos_DF["review_date"].cast(TimestampType()))\
.withColumn("star_rating", videos_DF["star_rating"].cast(IntegerType()))\
.withColumn("helpful_votes", videos_DF["helpful_votes"].cast(IntegerType()))\
.withColumn("total_votes", videos_DF["total_votes"].cast(IntegerType()))\

# Print schema
videos_DF.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [None]:
# create review dataframe to match review_id_table table
review_id_DF = videos_DF.select(["review_id","customer_id","product_id", "product_parent","review_date"])
review_id_DF.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R1P1G5KZ05H6RD|   49033728|6302503213|     748506413|2015-08-31 00:00:00|
|R106N066IUN8ZV|   17857748|B000059PET|     478710180|2015-08-31 00:00:00|
| R7WTAA1S5O7D9|   25551507|0788812807|     981002815|2015-08-31 00:00:00|
|R32HFMVWLYOYJK|   21025041|6302509939|     333219811|2015-08-31 00:00:00|
| RWT3H6HBVAL6G|   40943563|B00JENS2BI|     538101194|2015-08-31 00:00:00|
|R1S3T3GWUGQTW7|   17013969|6305761302|     716303278|2015-08-31 00:00:00|
|R3R0QYHA800REE|   47611685|6300157555|     134996462|2015-08-31 00:00:00|
|R1FR0EQPHPW5NM|   35680737|6300189570|     498116870|2015-08-31 00:00:00|
| RGORN81H45NI7|   10747909|B000SXQ5US|      77519275|2015-08-31 00:00:00|
|R1CNYN4ABNOJSU|     126341|B00008F22G|     917778300|2015-08-31 00:00:00|
|R2DW06821PMN40|   406768

In [None]:
# Print schema
review_id_DF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [None]:
# create products dataframe to match products table(only unique values)
products_DF = digital_game_DF.select(["product_id","product_title"])
products_DF = products_DF.dropDuplicates()
products_DF.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0788806270|Bambi (Walt Disne...|
|078881107X|Kiki's Delivery S...|
|6303559018|To Catch a Yeti [...|
|6302844061|      Silkwood [VHS]|
|B00003XAMY|Wagons Roll at Ni...|
|0783216084|          Jaws [VHS]|
|6301966376|The Compleat Beat...|
|6303315429| Yoga for Life [VHS]|
|6302969204|Miracle in Milan ...|
|6301540441|As Summers Die [VHS]|
|6304611366|Loretta Young Sho...|
|1559838450|Hans Christian An...|
|6304263198|   Latcho Drom [VHS]|
|6301017250|Dead Don't Die [VHS]|
|6301782135|John Hammond: Fro...|
|6304925158|The Education of ...|
|6305403309| Dead Husbands [VHS]|
|6300152936|       Faeries [VHS]|
|B0000897E9|  Femme Fatale [VHS]|
|B0002X4MIQ|Touched By An Ang...|
+----------+--------------------+
only showing top 20 rows



In [None]:
# create customers dataframe to match customers table
from pyspark.sql.functions import desc

customers_DF = videos_DF.groupby("customer_id").agg({"customer_id":"count"})
customers_DF = customers_DF.orderBy(desc("count(customer_id)"))
customers_DF = customers_DF.withColumnRenamed("count(customer_id)", "customer_count") 
customers_DF.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   50913245|          1005|
|   38002140|           953|
|   18116317|           898|
|   49778371|           765|
|   50763106|           648|
|   50205849|           580|
|   49355567|           567|
|   26803373|           559|
|   50736950|           534|
|    7080939|           506|
|   49837360|           422|
|   39371720|           394|
|   50303974|           391|
|   50237277|           387|
|   51299750|           382|
|   53016962|           380|
|   51010646|           334|
|   49786731|           328|
|   49818928|           315|
|   43921586|           310|
+-----------+--------------+
only showing top 20 rows



In [None]:
# create vine dataframe to match vine_table 

# load in a sql function to use columns
from pyspark.sql.functions import col

vine_table_DF = videos_DF.select(["review_id","star_rating","helpful_votes", "total_votes","vine"])

# filter for columns from Amazon's Vine program
vine_table_DF.show() 

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1P1G5KZ05H6RD|          5|            0|          0|   N|
|R106N066IUN8ZV|          5|            0|          0|   N|
| R7WTAA1S5O7D9|          4|            0|          0|   N|
|R32HFMVWLYOYJK|          5|            0|          0|   N|
| RWT3H6HBVAL6G|          3|            0|          0|   N|
|R1S3T3GWUGQTW7|          5|            0|          0|   N|
|R3R0QYHA800REE|          4|            0|          0|   N|
|R1FR0EQPHPW5NM|          4|            1|          2|   N|
| RGORN81H45NI7|          5|            1|          1|   N|
|R1CNYN4ABNOJSU|          5|            0|          0|   N|
|R2DW06821PMN40|          3|            0|          0|   N|
|R1CS8AMA8B0VBJ|          5|            0|          0|   N|
|R343CPRI4MC9J0|          5|            0|          0|   N|
| R3XP0G8P2BOTP|          5|            

In [None]:
# configure RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://video-games-db.<removed>.us-east-1.rds.amazonaws.com:5432/video_game_DB"
config = {"user":"postgres",
          "password":"<removed>",
          "driver":"org.postgresql.Driver"}

In [None]:
# write DataFrame to review_id_table in RDS

review_id_DF.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# write DataFrame to products_DF in RDS

# had to change the mode to "overwrite" because 
products_DF.write.jdbc(url=jdbc_url, table='products', mode= "ignore", properties=config)

In [None]:
# write DataFrame to customers_DF in RDS
customers_DF.write.jdbc(url=jdbc_url, table='customers', mode="ignore", properties=config)

In [None]:
# write DataFrame to vine_table_DF in RDS
vine_table_DF.write.jdbc(url=jdbc_url, table='vine_table', mode="ignore", properties=config)