In [4]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.7                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:10 https://developer.download.nvid

In [5]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-02-26 23:13:03--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-02-26 23:13:04 (5.30 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-1").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Load Amazon Data into Spark DataFrame

In [17]:
from pyspark import SparkFiles

# Access Amazon reviews for Health & Personal Care products:
reviews_url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"
filename = reviews_url.split("/")[len(reviews_url.split("/"))-1]
spark.sparkContext.addFile(reviews_url)
start_df = spark.read.csv(SparkFiles.get(filename), sep="\t", header=True, inferSchema=True).dropna(how="any")
start_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     650634| R3EQSTM9PWRAL|B0091LBZSU|     578484426|Demograss Capsule...|Health & Personal...|          3|            0|          0|   N|                Y|         Three Stars|Only came with 30...| 2015-08-31|
|         US|   19827510| RBWPRK17XKIXD|B00PWW3LQ6|     456433146|Viva Labs #1 Prem...|Health & Personal

# Determine number of reviews

In [18]:
# Count the rows in DataFrame:
num_reviews = start_df.count()
print(f"This data set contains {num_reviews:,} reviews.")

This data set contains 5,330,701 reviews.


# Cleaned up DataFrames to match tables

In [19]:
from pyspark.sql.functions import to_date

# Create the review_id DataFrame:
review_id_df = start_df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")]).drop_duplicates(["review_id"])
review_id_df.printSchema()
review_id_df.show()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R10000VJGO8MCK|    2083334|B00DU54RL0|     340307372| 2014-08-05|
|R10007LUZ5DLQQ|   52244216|B004HHPWMS|     751364015| 2014-11-28|
|R10008CAT4CGNA|   52760682|B000E95HP0|     890646754| 2014-07-25|
|R1000AJEXL9Q32|     960113|B00N3LR4JS|      30647045| 2015-06-12|
|R1000J3TT2MYV3|   50861671|B0000Y8H2E|     141660050| 2006-01-26|
|R1000JDI0VDXQD|   38376753|B00F6WRW2Q|     289111448| 2014-01-11|
|R1000MHSGYLONA|   20712206|B00J2RRECO|     365489360| 2015-01-24|
|R1000VN6RICGHJ|   44772349|B0068NH4CU|     695238852| 2015-08-21|
|R1000WJKFNFQ1G|   23294231|B00416V4AO|   

In [20]:
# Create the products DataFrame:
products_df = start_df.select(["product_id", "product_title"]).drop_duplicates()
products_df.printSchema()
products_df.show()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00N2BW2PK|Fitbit Charge HR ...|
|B001D8ZGAM|Aquasentials Easy...|
|B002NVSCQI|Beans72 Organic B...|
|B001O0BL9E|Manuka Health - M...|
|B00AYIM9Y8|BulkSupplements P...|
|B00EXPSWAI|HSI PROFESSIONAL ...|
|B00CMK8M1A|Scott Extra Soft ...|
|B00P80AV74|Monster Energy Dr...|
|B00PIOZVMA|Johnson & Johnson...|
|B00787ZQFW|MaritzMayer Raspb...|
|B0071O8B3G|Lipozene Diet Pil...|
|B005H441UE|Emphaplex 90 Caps...|
|B004WPBYUY|Vitanica Yeast Ar...|
|B00SQN2CXQ|Cloruro de Magnes...|
|B00O9NIRUY|Jimmy Orange Bran...|
|B00Y34E9ZQ|Slight Touch Cont...|
|B00ISDMQ8U|Puracy Natural La...|
|B002VWJYL6|Drive Medical Win...|
|B00NWYMDJM|Premium Prenatal ...|
|B000BABW5Q|Snorepin™ – Anti ...|
+----------+--------------------+
only showing top 20 rows



In [21]:
# Create the customers DataFrame:
customers_df = start_df.groupBy("customer_id").count().withColumnRenamed("count", "customer_count")
customers_df.printSchema()
customers_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    4919528|             5|
|   51451778|             3|
|   12713799|             1|
|    8673341|             1|
|   42146698|             2|
|    1117644|             2|
|   28058398|             1|
|   14375645|             1|
|   24540309|             1|
|   39715602|             1|
|     654272|             1|
|    9015608|             5|
|   43920023|            42|
|   38209321|             1|
|   12328685|             1|
|    2866605|             2|
|   38273165|             5|
|   41066514|             1|
|   28377689|             2|
|   17018784|             2|
+-----------+--------------+
only showing top 20 rows



In [22]:
# select columns from vine_df
# Create the vine DataFrame:
vine_df = start_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.printSchema()
vine_df.show()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| R3EQSTM9PWRAL|          3|            0|          0|   N|
| RBWPRK17XKIXD|          5|            0|          0|   N|
| RRSLOAF273XFC|          5|            1|          2|   N|
|R3S8W9Q6SWIT8O|          4|            0|          0|   N|
|R3QQ6NSLRVBFJC|          4|            0|          0|   N|
|R2XYDBMHUVJCSX|          2|            0|          1|   N|
|R1L94ESVC657A9|          5|            0|          0|   N|
|R3DI4B8LDWFQ3K|          1|            3|          4|   N|
|R1G7VV4HCXUQQL|          5|            0|          0|   N|
|R3TTNYN1TAMUSU|          5|            0|          0|   N|
|R3

# Push to AWS RDS instance

In [26]:
# Set RDS endpoint, database name, and password:
endpoint = ""
database = ""
password = ""

In [27]:
# set up properties to connect to AWS Postgres
mode = "overwrite"
jdbc_url = f"jdbc:postgresql://{endpoint}:5432/{database}"
config = {"user":"postgres", "password": password, "driver":"org.postgresql.Driver"}

In [28]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [29]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [30]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [31]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)