In [3]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.0% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.360% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  I

In [4]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-07-30 07:22:27--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-07-30 07:22:28 (6.43 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [6]:
from pyspark import SparkFiles
url = "https://reviews-bucket-1.s3.amazonaws.com/amazon_reviews_us_Video_DVD_v1_00.tsv"
spark.sparkContext.addFile(url)
path = "amazon_reviews_us_Video_DVD_v1_00.tsv"
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(path), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   27288431| R33UPQQUZQEM8|B005T4ND06|     400024643|Yoga for Movement...|       Video DVD|          5|            3|          3|   N|                Y|This was a gift f...|This was a gift f...| 2015-08-31|
|         US|   13722556|R3IKTNQQPD9662|B004EPZ070|     685335564|  Something Borrowed|       Video DVD|          5|    

### Create DataFrames to match tables

In [34]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R33UPQQUZQEM8|   27288431|B005T4ND06|     400024643| 2015-08-31|
|R3IKTNQQPD9662|   13722556|B004EPZ070|     685335564| 2015-08-31|
|R3U27V5QMCP27T|   20381037|B005S9EKCW|     922008804| 2015-08-31|
|R2TOH2QKNK4IOC|   24852644|B00FC1ZCB4|     326560548| 2015-08-31|
|R2XQG5NJ59UFMY|   15556113|B002ZG98Z0|     637495038| 2015-08-31|
|R1N1KHBRR4ZTX3|    6132474|B00X8RONBO|     896602391| 2015-08-31|
|R3OM9S0TCBP38K|   48049524|B000CEXFZG|     115883890| 2015-08-31|
|R1W4S949ZRCTBW|    3282516|B00ID8H8EW|     977932459| 2015-08-31|
|R18JL1NNQAZFV2|   51771179|B000TGJ8IU|     840084782| 2015-08-31|
|R1LP6PR06OPYUX|   31816501|B00DPMPTDS|     262144920| 2015-08-31|
| RZKBT035JA0UQ|   16164990|B00X797LUS|     883589001| 2015-08-31|
|R253N5W74SM7N3|   33386989|B00C6MXB42|     734735137| 2015-08

In [18]:
# Create the customers_table DataFrame
customers_table = df.groupby("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_table.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   24423656|             8|
|   24297214|             1|
|   12879980|             3|
|     515450|             2|
|   13313689|             1|
|   15523729|             4|
|    1673863|             1|
|   14552054|             2|
|   45392827|            14|
|   44178035|             1|
|   10522786|            10|
|   44848424|             3|
|   14230926|             1|
|   49243158|           274|
|   49084939|            10|
|   28777148|            41|
|   41836864|             1|
|    5219946|             1|
|   37795150|             1|
|   52081222|             1|
+-----------+--------------+
only showing top 20 rows



In [11]:
# Create the products_table DataFrame and drop duplicates. 
products_table = df.select(["product_id", "product_title"]).drop_duplicates(["product_id"])
products_table.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0471365831|Basic Electric Ci...|
|0607987162|Precipice Of Surv...|
|0739056042|Drumset 101: A Co...|
|0764009303|    Seeing Red [VHS]|
|0767026004|Greatest Blunders...|
|076781083X|Truth or Conseque...|
|0781766532|Interactive TEE R...|
|0783225911|     Rooster Cogburn|
|0789441020|Mountain Animals ...|
|0790742764|The Clan of the C...|
|0792292448|King Tut's Final ...|
|0792846834|Stranger Than Par...|
|0810123177|South Of Chicago:...|
|0965020495|The Twelve Fold P...|
|097144840X|Traveling to Olympia|
|1404986987|         Thumbsucker|
|1414391242|Simplify DVD Expe...|
|1419804987|What Ever Happene...|
|1440331588|Top Vibrant Water...|
|157252300X|       Padre Padrone|
+----------+--------------------+
only showing top 20 rows



In [12]:
products_table.count()

297919

In [20]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent",to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R33UPQQUZQEM8|   27288431|B005T4ND06|     400024643| 2015-08-31|
|R3IKTNQQPD9662|   13722556|B004EPZ070|     685335564| 2015-08-31|
|R3U27V5QMCP27T|   20381037|B005S9EKCW|     922008804| 2015-08-31|
|R2TOH2QKNK4IOC|   24852644|B00FC1ZCB4|     326560548| 2015-08-31|
|R2XQG5NJ59UFMY|   15556113|B002ZG98Z0|     637495038| 2015-08-31|
|R1N1KHBRR4ZTX3|    6132474|B00X8RONBO|     896602391| 2015-08-31|
|R3OM9S0TCBP38K|   48049524|B000CEXFZG|     115883890| 2015-08-31|
|R1W4S949ZRCTBW|    3282516|B00ID8H8EW|     977932459| 2015-08-31|
|R18JL1NNQAZFV2|   51771179|B000TGJ8IU|     840084782| 2015-08-31|
|R1LP6PR06OPYUX|   31816501|B00DPMPTDS|     262144920| 2015-08-31|
| RZKBT035JA0UQ|   16164990|B00X797LUS|     883589001| 2015-08-31|
|R253N5W74SM7N3|   33386989|B00C6MXB42|     734735137| 2015-08

In [21]:
# Create the vine_table. DataFrame
vine_table = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_table.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R33UPQQUZQEM8|          5|            3|          3|   N|                Y|
|R3IKTNQQPD9662|          5|            0|          0|   N|                Y|
|R3U27V5QMCP27T|          5|            1|          1|   N|                Y|
|R2TOH2QKNK4IOC|          5|            0|          1|   N|                Y|
|R2XQG5NJ59UFMY|          5|            0|          0|   N|                Y|
|R1N1KHBRR4ZTX3|          5|            0|          0|   N|                Y|
|R3OM9S0TCBP38K|          5|            0|          0|   N|                Y|
|R1W4S949ZRCTBW|          5|            0|          0|   N|                Y|
|R18JL1NNQAZFV2|          5|            0|          0|   N|                Y|
|R1LP6PR06OPYUX|          4|            0|          0|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [14]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://postgres-database-1.cwrey1kave3l.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "r8t3r1446##", 
          "driver":"org.postgresql.Driver"}

In [35]:
# Write review_id_table to table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [16]:
# Write products_table to table in RDS
# about 3 min
products_table.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [44]:
# Write customers_table to table in RDS
# 5 min 14 s
customers_table.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [38]:
# Write vine_table to table in RDS
# 11 minutes
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)