In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

In [0]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-02-02 20:34:52--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.4’


2020-02-02 20:34:53 (1.41 MB/s) - ‘postgresql-42.2.9.jar.4’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigDataMod16Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract

In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
review_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Sports_v1_00.tsv.gz"), sep="\t", header=True)

In [0]:
# Show DataFrame
review_data_df.show(10)
review_data_df.head()
review_data_df.printSchema()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48945260|R1WBPB8MDCCN8F|B012P7UPSM|     409940130|Chicago Blackhawk...|          Sports|          5|            0|          0|   N|                N|   LOVE IT. 6 stars!|Bought this last ...| 2015-08-31|
|         US|    5782091|R32M0YEWV77XG8|B001GQ3VHG|     657746679|Copag Poker Size ...|          Sports|          5|    

## Count the number of records

---



---



In [0]:
review_data_df.count()

4850360

## There are 4850360 records in the raw sports review dataset

---



---

# TRANSFORM

In [0]:
#Drop null Values
dropna_df = review_data_df.dropna()
dropna_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48945260|R1WBPB8MDCCN8F|B012P7UPSM|     409940130|Chicago Blackhawk...|          Sports|          5|            0|          0|   N|                N|   LOVE IT. 6 stars!|Bought this last ...| 2015-08-31|
|         US|    5782091|R32M0YEWV77XG8|B001GQ3VHG|     657746679|Copag Poker Size ...|          Sports|          5|    

In [0]:
dropna_df.count()

4848999

## There are 4848999 records remaining after droping NULL reviews

In [0]:
#Load in a sql function to use columns
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType

In [0]:
# Create Review-ID Table
reviewID_df = dropna_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
reviewID_df.show()
reviewID_df.printSchema()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1WBPB8MDCCN8F|   48945260|B012P7UPSM|     409940130| 2015-08-31|
|R32M0YEWV77XG8|    5782091|B001GQ3VHG|     657746679| 2015-08-31|
| RR8V7WR27NXJ5|   45813853|B008VS8M58|     962442336| 2015-08-31|
|R1MHO5V9Z932AY|    1593730|B005F06F4U|      74305227| 2015-08-31|
|R16PD71086BD2V|   29605511|B010T4IE2C|     787185588| 2015-08-31|
|R1Z8IFGWTRWXT6|   11112959|B004RKJGLS|      94127483| 2015-08-31|
|R3AUMSHAW73HWN|     108031|B005V3DCBU|     526977496| 2015-08-31|
|R2KWDWFOHGX6FL|   13981540|B00MHT9WN8|      26521265| 2015-08-31|
|R3H9543FWBWFBU|   37993909|B001CSIRQ8|     652431165| 2015-08-31|
| RUANXOQ9W3OU5|   26040213|B001KZ3NOO|     635861713| 2015-08-31|
|R31673RTGEZSW7|   34657602|B00005RCQS|      72099763| 2015-08-31|
|R22OQLFSH42RCM|   14346192|B00FA7RWVI|     757354022| 2015-08

In [0]:
reviewID_df_tmp1 = reviewID_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])\
.withColumn("customer_id", reviewID_df["customer_id"].cast(IntegerType()))

reviewID_df_tmp2 = reviewID_df_tmp1.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])\
.withColumn("product_parent", reviewID_df["product_parent"].cast(IntegerType()))

reviewID_clean_df = reviewID_df_tmp2.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])\
.withColumn("review_date", reviewID_df["review_date"].cast(DateType()))
reviewID_clean_df.printSchema()
reviewID_clean_df.show()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1WBPB8MDCCN8F|   48945260|B012P7UPSM|     409940130| 2015-08-31|
|R32M0YEWV77XG8|    5782091|B001GQ3VHG|     657746679| 2015-08-31|
| RR8V7WR27NXJ5|   45813853|B008VS8M58|     962442336| 2015-08-31|
|R1MHO5V9Z932AY|    1593730|B005F06F4U|      74305227| 2015-08-31|
|R16PD71086BD2V|   29605511|B010T4IE2C|     787185588| 2015-08-31|
|R1Z8IFGWTRWXT6|   11112959|B004RKJGLS|      94127483| 2015-08-31|
|R3AUMSHAW73HWN|     108031|B005V3DCBU|     526977496| 2015-08-31|
|R2KWDWFOHGX6FL|   13981540|B00MHT9WN8|      26521265| 2015-08-31|
|R3H9543FWBWFBU|   37993909|B001CSIRQ8|   

In [0]:
# Create Products Table
products_df_TMP = dropna_df.select(["product_id", "product_title"])
products_clean_df = products_df_TMP.dropDuplicates(["product_id"])
products_clean_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|1570340439|Compass & Map Nav...|
|9879000633|Swordmaster - Uni...|
|B00004RAN1|24x7x7 Pro Cage Trap|
|B00005OU7P|Putt and Return P...|
|B00005V3EN|Airzone Trampolin...|
|B00008Z7M2|Koolatron 401619 ...|
|B0000AAEMO|Seahawks Franklin...|
|B0000AI0K1|Outdoor Cap Heavy...|
|B0000ANGL1|NFL Men's New Yor...|
|B0000ANHRR|Heatgear Loose Lo...|
|B0000AU3AK|Everlast Genuine ...|
|B0000AUTWD|Cubby 5011 Mini Mite|
|B0000AVZBW|G- Lox  Deerskin ...|
|B0000AXJT5|Pro Mex Professio...|
|B0000AXNWO|   Ande Fluorocarbon|
|B0000AXUET|Star brite Brush ...|
|B0000AY199|DU-BRO Fishing Tr...|
|B0000AYEE8|Attwood Stainiles...|
|B0000AYGZ1|Attwood Stainless...|
|B0000AZ8LS|Tacstar 4-Shot Re...|
+----------+--------------------+
only showing top 20 rows



In [0]:
products_df_TMP.count()


4848999

In [0]:
products_clean_df.count()

1046060

In [0]:
# Create customers Table
customers_df= dropna_df.select(["customer_id"])
customers_df_TMP = customers_df.groupBy('customer_id').count()

customers_df_TMP.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   45193257|   21|
|   44934924|    1|
|   11614909|    1|
|   19186664|    1|
|   50492011|    1|
|   12988293|    3|
|   22848554|    4|
|   42126527|    1|
|   48782599|    1|
|   34146651|   10|
|   43194960|    3|
|   14127895|    1|
|   30636778|   44|
|   30274993|    1|
|   28069251|    1|
|   20545025|    5|
|   28029004|    4|
|   51290010|    1|
|   37367363|    1|
|    2583392|    7|
+-----------+-----+
only showing top 20 rows



In [0]:
# Customers Table clean
customers_CLEAN_df = customers_df_TMP.withColumnRenamed('count', 'customer_count')
customers_CLEAN_df = customers_CLEAN_df.select(["customer_id", "customer_count"])\
.withColumn("customer_count", customers_CLEAN_df["customer_count"].cast(IntegerType()))

customers_CLEAN_df = customers_CLEAN_df.select(["customer_id", "customer_count"])\
.withColumn("customer_id", customers_CLEAN_df["customer_id"].cast(IntegerType()))

customers_CLEAN_df.printSchema()
customers_CLEAN_df.show(5)

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   45193257|            21|
|   44934924|             1|
|   11614909|             1|
|   19186664|             1|
|   50492011|             1|
+-----------+--------------+
only showing top 5 rows



In [0]:
 # Create Vine Table
vine_df= dropna_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])

vine_df_TMP1 = vine_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])\
.withColumn("star_rating", vine_df["star_rating"].cast(IntegerType()))

vine_df_TMP2 = vine_df_TMP1.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])\
.withColumn("helpful_votes", vine_df["helpful_votes"].cast(IntegerType()))

vine_CLEAN_df = vine_df_TMP2.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])\
.withColumn("total_votes", vine_df["total_votes"].cast(IntegerType()))

vine_CLEAN_df.printSchema()
vine_CLEAN_df.show()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1WBPB8MDCCN8F|          5|            0|          0|   N|
|R32M0YEWV77XG8|          5|            1|          1|   N|
| RR8V7WR27NXJ5|          1|            0|          0|   N|
|R1MHO5V9Z932AY|          5|            0|          0|   N|
|R16PD71086BD2V|          5|            0|          1|   N|
|R1Z8IFGWTRWXT6|          3|            0|          0|   N|
|R3AUMSHAW73HWN|          4|            2|          3|   N|
|R2KWDWFOHGX6FL|          5|            0|          0|   N|
|R3H9543FWBWFBU|          5|            1|          1|   N|
| RUANXOQ9W3OU5|          5|            0|          0|   N|
|R3

In [0]:
# LOAD
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://jmerkel-bigdata-challenge.cg5lg3umcaun.us-west-1.rds.amazonaws.com:5432/mod16_challenge"
config = {"user":"postgres",
          "password": "nC_!3xeNdJibAuXUvk_.",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to review_id table in RDS
reviewID_clean_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
# Write DataFrame to products table in RDS
products_clean_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
# Write DataFrame to customers table in RDS
customers_CLEAN_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [0]:
# Write DataFrame to vine table in RDS
vine_CLEAN_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)