<a href="https://colab.research.google.com/github/mhvarner/BigData/blob/master/SparkChallenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

import pyspark

In [61]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-07-19 21:52:39--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-07-19 21:52:39 (4.79 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [92]:
os.environ['SPARK_CLASSPATH'] = "/content/postgresql-42.2.9.jar"

In [4]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Amazon_Sports").config("spark.driver.extraClassPath", "/content/postgresql-42.2.9.jar").getOrCreate()

In [69]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Sports_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Sports_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

In [70]:
# Show DataFrame
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   48945260|R1WBPB8MDCCN8F|B012P7UPSM|     409940130|Chicago Blackhawk...|          Sports|          5|            0|          0|   N|                N|   LOVE IT. 6 stars!|Bought this last ...|2015-08-31 00:00:00|
|         US|    5782091|R32M0YEWV77XG8|B001GQ3VHG|     657746679|Copag Poker Size ...| 

In [71]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [81]:
vine_table = vine_table.withColumn("star_rating", vine_table["star_rating"].cast(IntegerType()))

In [77]:
vine_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [43]:
# Load in a sql function to use columns
from pyspark.sql.functions import col
# review_id_table
review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1WBPB8MDCCN8F|   48945260|B012P7UPSM|     409940130| 2015-08-31|
|R32M0YEWV77XG8|    5782091|B001GQ3VHG|     657746679| 2015-08-31|
| RR8V7WR27NXJ5|   45813853|B008VS8M58|     962442336| 2015-08-31|
|R1MHO5V9Z932AY|    1593730|B005F06F4U|      74305227| 2015-08-31|
|R16PD71086BD2V|   29605511|B010T4IE2C|     787185588| 2015-08-31|
|R1Z8IFGWTRWXT6|   11112959|B004RKJGLS|      94127483| 2015-08-31|
|R3AUMSHAW73HWN|     108031|B005V3DCBU|     526977496| 2015-08-31|
|R2KWDWFOHGX6FL|   13981540|B00MHT9WN8|      26521265| 2015-08-31|
|R3H9543FWBWFBU|   37993909|B001CSIRQ8|     652431165| 2015-08-31|
| RUANXOQ9W3OU5|   26040213|B001KZ3NOO|     635861713| 2015-08-31|
|R31673RTGEZSW7|   34657602|B00005RCQS|      72099763| 2015-08-31|
|R22OQLFSH42RCM|   14346192|B00FA7RWVI|     757354022| 2015-08

In [44]:
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- review_date: string (nullable = true)



In [35]:
# Filter for only columns with active users
#review_id_table = review_id_table.filter(col("review_date") is None)
#review_id_table.show()

from pyspark.sql import functions as F
#review_id_table.filter(F.isnull("review_date")).show()
review_id_table_bkup = review_id_table.na.drop(subset=["review_date"])

In [57]:
# products table
products = df.select(["product_id", "product_title"])
products = products.dropDuplicates(['product_id'])
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|1570340439|Compass & Map Nav...|
|9879000633|Swordmaster - Uni...|
|B00004RAN1|24x7x7 Pro Cage Trap|
|B00005OU7P|Putt and Return P...|
|B00005V3EN|Airzone Trampolin...|
|B00008Z7M2|Koolatron 401619 ...|
|B0000AAEMO|Seahawks Franklin...|
|B0000AI0K1|Outdoor Cap Heavy...|
|B0000ANGL1|NFL Men's New Yor...|
|B0000ANHRR|Heatgear Loose Lo...|
|B0000AU3AK|Everlast Genuine ...|
|B0000AUTWD|Cubby 5011 Mini Mite|
|B0000AVZBW|G- Lox  Deerskin ...|
|B0000AXJT5|Pro Mex Professio...|
|B0000AXNWO|   Ande Fluorocarbon|
|B0000AXUET|Star brite Brush ...|
|B0000AY199|DU-BRO Fishing Tr...|
|B0000AYEE8|Attwood Stainiles...|
|B0000AYGZ1|Attwood Stainless...|
|B0000AZ8LS|Tacstar 4-Shot Re...|
+----------+--------------------+
only showing top 20 rows



In [12]:
# customers table
from pyspark.sql.functions import countDistinct
customers = df.groupBy('customer_id').count()
customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   45193257|   21|
|   44934924|    1|
|   11614909|    1|
|   19186664|    1|
|   50492011|    1|
|   12988293|    3|
|   22848554|    4|
|   42126527|    1|
|   48782599|    1|
|   34146651|   10|
|   43194960|    3|
|   14127895|    1|
|   30636778|   44|
|   30274993|    1|
|   28069251|    1|
|   20545025|    5|
|   28029004|    4|
|   51290010|    1|
|   37367363|    1|
|    2583392|    7|
+-----------+-----+
only showing top 20 rows



In [62]:
customers=customers.withColumnRenamed("count","customer_count")

In [63]:
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_count: long (nullable = false)



In [65]:
customers = customers.withColumn("customer_id", customers["customer_id"].cast(IntegerType()))
customers = customers.withColumn("customer_count", customers["customer_count"].cast(IntegerType()))

In [74]:
# vine_table
vine_table =  df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1WBPB8MDCCN8F|          5|            0|          0|   N|
|R32M0YEWV77XG8|          5|            1|          1|   N|
| RR8V7WR27NXJ5|          1|            0|          0|   N|
|R1MHO5V9Z932AY|          5|            0|          0|   N|
|R16PD71086BD2V|          5|            0|          1|   N|
|R1Z8IFGWTRWXT6|          3|            0|          0|   N|
|R3AUMSHAW73HWN|          4|            2|          3|   N|
|R2KWDWFOHGX6FL|          5|            0|          0|   N|
|R3H9543FWBWFBU|          5|            1|          1|   N|
| RUANXOQ9W3OU5|          5|            0|          0|   N|
|R31673RTGEZSW7|          5|            2|          2|   N|
|R22OQLFSH42RCM|          5|            1|          1|   N|
|R12LEL4F3TSZUJ|          5|            2|          2|   N|
|R2L9XWD03072NI|          5|            

In [14]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://challengedb.cargnvj1ikcv.us-east-2.rds.amazonaws.com:5432/ChallengeDB"
config = {"user":"postgres", 
          "password": "135792468", 
          "driver": "org.postgresql.Driver"}

In [47]:
from pyspark.sql.types import IntegerType
review_id_table = review_id_table.withColumn("customer_id", review_id_table["customer_id"].cast(IntegerType()))
review_id_table = review_id_table.withColumn("product_parent", review_id_table["product_parent"].cast(IntegerType()))

In [45]:
from datetime import datetime
from pyspark.sql.functions import col, udf, unix_timestamp, to_date
from pyspark.sql.types import DateType


# Setting an user define function:
# This function converts the string cell into a date:
#func =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())
#func =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())

review_id_table = review_id_table.withColumn('review_date', to_date(unix_timestamp(col('review_date'), 'yyyy-MM-dd').cast("timestamp")))

review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- review_date: date (nullable = true)



In [48]:
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [49]:
# Write DataFrame to  review_id_table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [17]:
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- review_date: string (nullable = true)



In [51]:
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [58]:
print(products.select('product_id').distinct().count())
print(products.select('product_id').count())

1046150
1046150


In [59]:
# Write DataFrame to products table in RDS
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [66]:
# Write DataFrame to customers table in RDS
customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [67]:
vine_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)



In [78]:
# Write DataFrame to  vine_table in RDS
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [80]:
# Analysis Portion
vine_table = vine_table.filter(col("star_rating") == 5)
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1WBPB8MDCCN8F|          5|            0|          0|   N|
|R32M0YEWV77XG8|          5|            1|          1|   N|
|R1MHO5V9Z932AY|          5|            0|          0|   N|
|R16PD71086BD2V|          5|            0|          1|   N|
|R2KWDWFOHGX6FL|          5|            0|          0|   N|
|R3H9543FWBWFBU|          5|            1|          1|   N|
| RUANXOQ9W3OU5|          5|            0|          0|   N|
|R31673RTGEZSW7|          5|            2|          2|   N|
|R22OQLFSH42RCM|          5|            1|          1|   N|
|R12LEL4F3TSZUJ|          5|            2|          2|   N|
|R2L9XWD03072NI|          5|            1|          2|   N|
|R2K0U91HIACANO|          5|            0|          0|   N|
|R31XREAAMATEPY|          5|            0|          0|   N|
|R2116AVB87SO38|          5|            